Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8091c2c

Browse files
knizhnikkelvich
authored andcommitted
Add arbitrator
1 parent 75acab9 commit 8091c2c

File tree

5 files changed

+1194
-1046
lines changed

5 files changed

+1194
-1046
lines changed

arbitrator/arbitrator.cpp

+123
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#include <time.h>
2+
#include <stdio.h>
3+
#include <stdarg.h>
4+
#include <stdlib.h>
5+
#include <inttypes.h>
6+
#include <sys/time.h>
7+
#include <pthread.h>
8+
9+
#include <string>
10+
#include <vector>
11+
12+
#include <pqxx/connection>
13+
#include <pqxx/transaction>
14+
#include <pqxx/subtransaction.hxx>
15+
#include <pqxx/nontransaction>
16+
#include <pqxx/pipeline>
17+
18+
typedef unsigned long long ulong64; /* we are not using uint64 here because we want to use %lld format for this type */
19+
20+
typedef ulong64 nodemask_t;
21+
22+
#define BIT_CHECK(mask, bit) (((mask) & ((nodemask_t)1 << (bit))) != 0)
23+
#define BIT_CLEAR(mask, bit) (mask &= ~((nodemask_t)1 << (bit)))
24+
#define BIT_SET(mask, bit) (mask |= ((nodemask_t)1 << (bit)))
25+
26+
using namespace std;
27+
using namespace pqxx;
28+
29+
template<class T>
30+
class my_unique_ptr
31+
{
32+
T* ptr;
33+
34+
public:
35+
my_unique_ptr(T* p = NULL) : ptr(p) {}
36+
~my_unique_ptr() { delete ptr; }
37+
T& operator*() { return *ptr; }
38+
T* operator->() { return ptr; }
39+
void operator=(T* p) { ptr = p; }
40+
void operator=(my_unique_ptr& other) {
41+
ptr = other.ptr;
42+
other.ptr = NULL;
43+
}
44+
};
45+
46+
int main (int argc, char* argv[])
47+
{
48+
vector<string> connection_strings;
49+
50+
if (argc == 1){
51+
printf("Use -h to show usage options\n");
52+
return 1;
53+
}
54+
55+
for (int i = 1; i < argc; i++) {
56+
if (argv[i][0] == '-') {
57+
switch (argv[i][1]) {
58+
case 't':
59+
cfs.timeout = atoi(argv[++i]);
60+
continue;
61+
case 'c':
62+
cfg.connections.push_back(string(argv[++i]));
63+
continue;
64+
}
65+
}
66+
printf("Options:\n"
67+
"\t-t TIMEOUT\ttimeout in seconds of waiting database connection string\n"
68+
"\t-c STR\tdatabase connection string\n");
69+
return 1;
70+
}
71+
72+
size_t nConns = connection_strings.size();
73+
vector< my_unique_ptr<connection> > conns(nConns);
74+
for (size_t i = 0; i < nConns; i++) {
75+
conns[i] = new connection(connection_strings[i]);
76+
}
77+
nodemask_t disabledMask = 0;
78+
nodemask_t enabledMask = 0;
79+
80+
while (true) {
81+
vector< my_unique_ptr<nontransaction> > txns(conns.size());
82+
vector< my_unique_ptr<pipeline> > pipes(nConns);
83+
vector<pipeline::query_id> queries(nConns);
84+
char sql[128];
85+
sprintf(sql, "select mtm.arbitrator_poll(%lld)", disabledMask);
86+
87+
for (size_t i = 0; i < nConns; i++) {
88+
if (BIT_CHECK(disabledMask, i)) {
89+
if (BIT_CHECK(enabledMask, i)) {
90+
try {
91+
delete conns[i];
92+
conns[i] = new connection(connection_strings[i]);
93+
BIT_CLEAR(disabledMask, i);
94+
} catch (pqxx_exception const& x) {
95+
conns[i] = NULL;
96+
fprintf(stderr, "Failed to connect to node %d: %s\n", (int)i+1, x.base().what());
97+
}
98+
}
99+
}
100+
if (!BIT_CHECK(disabledMask, i)) {
101+
txns[i] = new nontransaction(*conns[i]);
102+
pipes[i] = new pipeline(*txns[i]);
103+
queries[i] = pipes[i]->insert(sql);
104+
}
105+
sleep(cfg.timeout);
106+
enabledMask = disabledMask;
107+
for (size_t i = 0; i < nConns; i++) {
108+
if (!BIT_CHECK(didsabledMask, i)) {
109+
if (!pipes[i]->is_finished(queries[i]))
110+
{
111+
fprintf(stderr, "Doesn't receive response from node %d within %d seconds\n", (int)i+1, cfs.timeout);
112+
BIT_SET(disabledMask, i);
113+
delete conns[i];
114+
conns[i] = NULL;
115+
} else {
116+
result r = pipes[i]->retrieve(results[i]);
117+
enabledMask &= ~r[0][0].as(nodemask_t());
118+
}
119+
}
120+
}
121+
}
122+
}
123+
}

multimaster--1.0.sql

+4
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ CREATE FUNCTION mtm.check_deadlock(xid bigint) RETURNS boolean
9191
AS 'MODULE_PATHNAME','mtm_check_deadlock'
9292
LANGUAGE C;
9393

94+
CREATE FUNCTION mtm.arbitraror_poll_status(xid bigint) RETURNS bigint
95+
AS 'MODULE_PATHNAME','mtm_arbitrator_poll'
96+
LANGUAGE C;
97+
9498
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));
9599

96100
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS

0 commit comments

Comments
 (0)