Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b99ad93

Browse files
knizhnikkelvich
authored andcommitted
Support table copy between multimaster nodes
1 parent b85c8af commit b99ad93

9 files changed

+196
-42
lines changed

arbitrator/arbitrator.cpp

+71-36
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <stdio.h>
33
#include <stdarg.h>
44
#include <stdlib.h>
5+
#include <unistd.h>
56
#include <inttypes.h>
67
#include <sys/time.h>
78
#include <pthread.h>
@@ -34,18 +35,34 @@ class my_unique_ptr
3435
public:
3536
my_unique_ptr(T* p = NULL) : ptr(p) {}
3637
~my_unique_ptr() { delete ptr; }
37-
T& operator*() { return *ptr; }
38-
T* operator->() { return ptr; }
39-
void operator=(T* p) { ptr = p; }
38+
T& operator*() const { return *ptr; }
39+
T* operator->() const { return ptr; }
40+
bool operator ==(T const* p) const { return ptr == p; }
41+
bool operator !=(T const* p) const { return ptr != p; }
42+
void operator=(T* p) {
43+
delete ptr;
44+
ptr = p;
45+
}
4046
void operator=(my_unique_ptr& other) {
47+
delete ptr;
4148
ptr = other.ptr;
4249
other.ptr = NULL;
4350
}
4451
};
4552

53+
struct config
54+
{
55+
int timeout;
56+
vector<string> connections;
57+
58+
config() {
59+
timeout = 1000000; // 1 second
60+
}
61+
};
62+
4663
int main (int argc, char* argv[])
4764
{
48-
vector<string> connection_strings;
65+
config cfg;
4966

5067
if (argc == 1){
5168
printf("Use -h to show usage options\n");
@@ -56,26 +73,27 @@ int main (int argc, char* argv[])
5673
if (argv[i][0] == '-') {
5774
switch (argv[i][1]) {
5875
case 't':
59-
cfs.timeout = atoi(argv[++i]);
76+
cfg.timeout = atoi(argv[++i]);
6077
continue;
6178
case 'c':
6279
cfg.connections.push_back(string(argv[++i]));
6380
continue;
6481
}
6582
}
6683
printf("Options:\n"
67-
"\t-t TIMEOUT\ttimeout in seconds of waiting database connection string\n"
84+
"\t-t TIMEOUT\ttimeout in microseconds of waiting database connection string (default: 1 second)\n"
6885
"\t-c STR\tdatabase connection string\n");
6986
return 1;
7087
}
7188

72-
size_t nConns = connection_strings.size();
89+
size_t nConns = cfg.connections.size();
7390
vector< my_unique_ptr<connection> > conns(nConns);
7491
for (size_t i = 0; i < nConns; i++) {
75-
conns[i] = new connection(connection_strings[i]);
92+
conns[i] = new connection(cfg.connections[i]);
7693
}
7794
nodemask_t disabledMask = 0;
78-
nodemask_t enabledMask = 0;
95+
nodemask_t newEnabledMask = 0;
96+
nodemask_t oldEnabledMask = 0;
7997

8098
while (true) {
8199
vector< my_unique_ptr<nontransaction> > txns(conns.size());
@@ -84,46 +102,63 @@ int main (int argc, char* argv[])
84102
char sql[128];
85103
sprintf(sql, "select mtm.arbitrator_poll(%lld)", disabledMask);
86104

105+
// Initiate queries to all live nodes
87106
for (size_t i = 0; i < nConns; i++) {
88-
if (BIT_CHECK(disabledMask, i)) {
89-
if (BIT_CHECK(enabledMask, i)) {
107+
// Some of live node reestablished connection with dead node, so arbitrator should also try to connect to this node
108+
if (conns[i] == NULL) {
109+
if (BIT_CHECK(newEnabledMask, i)) {
90110
try {
91-
delete conns[i];
92-
conns[i] = new connection(connection_strings[i]);
111+
conns[i] = new connection(cfg.connections[i]);
93112
BIT_CLEAR(disabledMask, i);
113+
fprintf(stdout, "Reestablish connection with node %d\n", (int)i+1);
94114
} catch (pqxx_exception const& x) {
95-
conns[i] = NULL;
96-
fprintf(stderr, "Failed to connect to node %d: %s\n", (int)i+1, x.base().what());
115+
if (conns[i] == NULL) {
116+
conns[i] = NULL;
117+
fprintf(stderr, "Failed to connect to node %d: %s\n", (int)i+1, x.base().what());
118+
}
97119
}
98120
}
99-
}
100-
if (!BIT_CHECK(disabledMask, i)) {
121+
} else {
101122
txns[i] = new nontransaction(*conns[i]);
102123
pipes[i] = new pipeline(*txns[i]);
103124
queries[i] = pipes[i]->insert(sql);
104125
}
105-
sleep(cfg.timeout);
106-
enabledMask = 0;
107-
for (size_t i = 0; i < nConns; i++) {
108-
if (!BIT_CHECK(didsabledMask, i)) {
109-
if (!pipes[i]->is_finished(queries[i]))
110-
{
111-
fprintf(stderr, "Doesn't receive response from node %d within %d seconds\n", (int)i+1, cfs.timeout);
112-
BIT_SET(disabledMask, i);
113-
delete conns[i];
126+
}
127+
// Wait some time
128+
usleep(cfg.timeout);
129+
oldEnabledMask = newEnabledMask;
130+
newEnabledMask = ~0;
131+
for (size_t i = 0; i < nConns; i++) {
132+
if (!BIT_CHECK(disabledMask, i)) {
133+
if (!pipes[i]->is_finished(queries[i])) {
134+
fprintf(stderr, "Doesn't receive response from node %d within %d microseconds\n", (int)i+1, cfg.timeout);
135+
BIT_SET(disabledMask, i);
136+
conns[i] = NULL;
137+
} else {
138+
try {
139+
result r = pipes[i]->retrieve(queries[i]);
140+
newEnabledMask &= r[0][0].as(nodemask_t());
141+
} catch (pqxx_exception const& x) {
114142
conns[i] = NULL;
115-
} else {
116-
try {
117-
result r = pipes[i]->retrieve(results[i]);
118-
enabledMask |= r[0][0].as(nodemask_t());
119-
} catch (pqxx_exception const& x) {
120-
delete conns[i];
121-
conns[i] = NULL;
122-
fprintf(stderr, "Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
123-
}
124-
}
143+
fprintf(stderr, "Failed to retrieve result from node %d: %s\n", (int)i+1, x.base().what());
144+
}
125145
}
126146
}
127147
}
148+
if (newEnabledMask == ~0) {
149+
if (oldEnabledMask != ~0) {
150+
fprintf(stdout, "There are no more live nodes\n");
151+
}
152+
// No live nodes:
153+
disabledNodeMask = 0;
154+
} else {
155+
if (newEnabledMask != oldEnabledMask) {
156+
for (size_t i = 0; i < nConns; i++) {
157+
if (BIT_CHECK(newEnabledMask ^ oldEnabledMask, i)) {
158+
fprintf(stdout, "Node %d is %s\n", (int)i+1, BIT_CHECK(newEnabledMask, i) ? "enabled" : "disabled");
159+
}
160+
}
161+
}
162+
}
128163
}
129164
}

multimaster--1.0.sql

+8
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,14 @@ CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
7575
AS 'MODULE_PATHNAME','mtm_make_table_local'
7676
LANGUAGE C;
7777

78+
CREATE FUNCTION mtm.broadcast_table(srcTable regclass, dstNodesMask bigint) RETURNS void
79+
AS 'MODULE_PATHNAME','mtm_broadcast_table'
80+
LANGUAGE C;
81+
82+
CREATE FUNCTION mtm.copy_table(srcTable regclass, dstNode integer) RETURNS void
83+
AS 'MODULE_PATHNAME','mtm_copy_table'
84+
LANGUAGE C;
85+
7886
CREATE FUNCTION mtm.dump_lock_graph() RETURNS text
7987
AS 'MODULE_PATHNAME','mtm_dump_lock_graph'
8088
LANGUAGE C;

multimaster.c

+20
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
129129
PG_FUNCTION_INFO_V1(mtm_inject_2pc_error);
130130
PG_FUNCTION_INFO_V1(mtm_check_deadlock);
131131
PG_FUNCTION_INFO_V1(mtm_arbitrator_poll);
132+
PG_FUNCTION_INFO_V1(mtm_broadcast_table);
133+
PG_FUNCTION_INFO_V1(mtm_copy_table);
132134

133135
static Snapshot MtmGetSnapshot(Snapshot snapshot);
134136
static void MtmInitialize(void);
@@ -4382,6 +4384,24 @@ mtm_collect_cluster_info(PG_FUNCTION_ARGS)
43824384
}
43834385
}
43844386

4387+
Datum mtm_broadcast_table(PG_FUNCTION_ARGS)
4388+
{
4389+
MtmCopyRequest copy;
4390+
copy.sourceTable = PG_GETARG_OID(0);
4391+
copy.targetNodes = PG_GETARG_INT64(1);
4392+
LogLogicalMessage("B", (char*)&copy, sizeof(copy), false);
4393+
PG_RETURN_VOID();
4394+
}
4395+
4396+
Datum mtm_copy_table(PG_FUNCTION_ARGS)
4397+
{
4398+
MtmCopyRequest copy;
4399+
copy.sourceTable = PG_GETARG_OID(0);
4400+
copy.targetNodes = (nodemask_t)1 << (PG_GETARG_INT32(1) - 1);
4401+
LogLogicalMessage("B", (char*)&copy, sizeof(copy), false);
4402+
PG_RETURN_VOID();
4403+
}
4404+
43854405

43864406
Datum mtm_make_table_local(PG_FUNCTION_ARGS)
43874407
{

multimaster.h

+7
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,13 @@ typedef struct
207207
} MtmConnectionInfo;
208208

209209

210+
typedef struct
211+
{
212+
Oid sourceTable;
213+
nodemask_t targetNodes;
214+
} MtmCopyRequest;
215+
216+
210217
typedef struct
211218
{
212219
MtmConnectionInfo con;

pglogical_apply.c

+50
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include "catalog/catversion.h"
1515
#include "catalog/dependency.h"
1616
#include "catalog/index.h"
17+
#include "catalog/heap.h"
1718
#include "catalog/namespace.h"
1819
#include "catalog/pg_type.h"
1920

@@ -367,6 +368,49 @@ process_remote_begin(StringInfo s)
367368
return true;
368369
}
369370

371+
static void
372+
process_broadcast_table(StringInfo s)
373+
{
374+
Relation rel;
375+
char ch;
376+
EState *estate;
377+
TupleData new_tuple;
378+
TupleTableSlot *newslot;
379+
TupleTableSlot *oldslot;
380+
HeapTuple tup;
381+
382+
StartTransactionCommand();
383+
384+
ch = pq_getmsgbyte(s);
385+
Assert(ch == 'R');
386+
rel = read_rel(s, AccessExclusiveLock);
387+
388+
heap_truncate_one_rel(rel);
389+
390+
estate = create_rel_estate(rel);
391+
newslot = ExecInitExtraTupleSlot(estate);
392+
oldslot = ExecInitExtraTupleSlot(estate);
393+
ExecSetSlotDescriptor(newslot, RelationGetDescr(rel));
394+
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
395+
396+
ExecOpenIndices(estate->es_result_relation_info, false);
397+
398+
while (s->cursor != s->len) {
399+
read_tuple_parts(s, rel, &new_tuple);
400+
tup = heap_form_tuple(RelationGetDescr(rel),
401+
new_tuple.values, new_tuple.isnull);
402+
ExecStoreTuple(tup, newslot, InvalidBuffer, true);
403+
simple_heap_insert(rel, newslot->tts_tuple);
404+
UserTableUpdateOpenIndexes(estate, newslot);
405+
}
406+
407+
ExecCloseIndices(estate->es_result_relation_info);
408+
ExecResetTupleTable(estate->es_tupleTable, true);
409+
FreeExecutorState(estate);
410+
411+
CommitTransactionCommand();
412+
}
413+
370414
static bool
371415
process_remote_message(StringInfo s)
372416
{
@@ -377,6 +421,12 @@ process_remote_message(StringInfo s)
377421

378422
switch (action)
379423
{
424+
case 'B':
425+
{
426+
process_broadcast_table(s);
427+
standalone = true;
428+
break;
429+
}
380430
case 'C':
381431
{
382432
MTM_LOG1("%d: Executing non-tx utility statement %s", MyProcPid, messageBody);

pglogical_output.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -557,7 +557,7 @@ pg_decode_message(LogicalDecodingContext *ctx,
557557
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
558558

559559
MtmOutputPluginPrepareWrite(ctx, true, !transactional);
560-
data->api->write_message(ctx->out, prefix, sz, message);
560+
data->api->write_message(ctx->out, data, prefix, sz, message);
561561
MtmOutputPluginWrite(ctx, true, !transactional);
562562
}
563563

pglogical_proto.c

+36-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "utils/syscache.h"
3535
#include "utils/timestamp.h"
3636
#include "utils/typcache.h"
37+
#include "utils/snapmgr.h"
3738

3839
#include "multimaster.h"
3940
#include "pglogical_relid_map.h"
@@ -61,7 +62,7 @@ static void pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
6162
Relation rel, HeapTuple oldtuple);
6263

6364
static void pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
64-
Relation rel, HeapTuple tuple);
65+
Relation rel, HeapTuple tuple);
6566
static char decide_datum_transfer(Form_pg_attribute att,
6667
Form_pg_type typclass,
6768
bool allow_internal_basetypes,
@@ -167,8 +168,38 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
167168
}
168169
}
169170

171+
static void pglogical_broadcast_table(StringInfo out, PGLogicalOutputData *data, MtmCopyRequest* copy)
172+
{
173+
if (BIT_CHECK(copy->targetNodes, MtmReplicationNodeId-1)) {
174+
HeapScanDesc scandesc;
175+
HeapTuple tuple;
176+
Relation rel;
177+
178+
StartTransactionCommand();
179+
180+
rel = heap_open(copy->sourceTable, ShareLock);
181+
182+
pq_sendbyte(out, 'M');
183+
pq_sendbyte(out, 'B');
184+
pq_sendint(out, sizeof(*copy), 4);
185+
pq_sendbytes(out, (char*)copy, sizeof(*copy));
186+
187+
pglogical_write_rel(out, data, rel);
188+
189+
scandesc = heap_beginscan(rel, GetTransactionSnapshot(), 0, NULL);
190+
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
191+
{
192+
pglogical_write_tuple(out, data, rel, tuple);
193+
}
194+
heap_endscan(scandesc);
195+
heap_close(rel, ShareLock);
196+
197+
CommitTransactionCommand();
198+
}
199+
}
200+
170201
static void
171-
pglogical_write_message(StringInfo out,
202+
pglogical_write_message(StringInfo out, PGLogicalOutputData *data,
172203
const char *prefix, Size sz, const char *message)
173204
{
174205
MtmLastRelId = InvalidOid;
@@ -199,6 +230,9 @@ pglogical_write_message(StringInfo out,
199230
* so no need to send that to replicas.
200231
*/
201232
return;
233+
case 'B':
234+
pglogical_broadcast_table(out, data, (MtmCopyRequest*)message);
235+
return;
202236
}
203237
pq_sendbyte(out, 'M');
204238
pq_sendbyte(out, *prefix);

pglogical_proto.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedef void (*pglogical_write_begin_fn)(StringInfo out, struct PGLogicalOutputData *data,
2323
ReorderBufferTXN *txn);
24-
typedef void (*pglogical_write_message_fn)(StringInfo out,
24+
typedef void (*pglogical_write_message_fn)(StringInfo out, struct PGLogicalOutputData *data,
2525
const char *prefix, Size sz, const char *message);
2626
typedef void (*pglogical_write_commit_fn)(StringInfo out, struct PGLogicalOutputData *data,
2727
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);

0 commit comments

Comments
 (0)