Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit cbdf8bb

Browse files
committed
Multimaster first sign of life
1 parent 23aa459 commit cbdf8bb

File tree

10 files changed

+191
-75
lines changed

10 files changed

+191
-75
lines changed

contrib/multimaster/decoder_raw.c

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,10 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
108108
data->isLocal = true;
109109
} else {
110110
OutputPluginPrepareWrite(ctx, true);
111+
elog(WARNING, "Send transation to %u to replica", txn->xid);
111112
appendStringInfo(ctx->out, "BEGIN %u;", txn->xid);
112113
OutputPluginWrite(ctx, true);
114+
data->isLocal = false;
113115
}
114116
}
115117

@@ -120,6 +122,7 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
120122
{
121123
DecoderRawData *data = ctx->output_plugin_private;
122124
if (!data->isLocal) {
125+
elog(WARNING, "Send commit of %u to replica", txn->xid);
123126
OutputPluginPrepareWrite(ctx, true);
124127
appendStringInfoString(ctx->out, "COMMIT;");
125128
OutputPluginWrite(ctx, true);
@@ -476,7 +479,9 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
476479
if (data->isLocal) {
477480
return;
478481
}
479-
/* Avoid leaking memory by using and resetting our own context */
482+
elog(WARNING, "Send action %d in transaction %u to replica", change->action, txn->xid);
483+
484+
/* Avoid leaking memory by using and resetting our own context */
480485
old = MemoryContextSwitchTo(data->context);
481486

482487
/*

contrib/multimaster/dtmd/src/main.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ static xid_t get_global_xmin() {
270270
static void onbegin(client_t client, int argc, xid_t *argv) {
271271
Transaction *t;
272272
CHECK(
273-
argc == 1,
273+
argc == 2,
274274
client,
275275
"BEGIN: wrong number of arguments"
276276
);
@@ -292,7 +292,7 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
292292

293293
prev_gxid = t->xid = next_gxid++;
294294
t->snapshots_count = 0;
295-
t->size = 1;
295+
t->size = argv[1];
296296

297297
t->collision = transaction_hash[t->xid % MAX_TRANSACTIONS];
298298
transaction_hash[t->xid % MAX_TRANSACTIONS] = t;
@@ -448,7 +448,6 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
448448
if (CLIENT_XID(client) == INVALID_XID) {
449449
CLIENT_SNAPSENT(client) = 0;
450450
CLIENT_XID(client) = t->xid;
451-
t->size += 1;
452451
}
453452

454453
CHECK(

contrib/multimaster/init.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
create extension multimaster;
2+
create table foo(k integer primary key, v varchar);

contrib/multimaster/libdtm.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ void DtmInitSnapshot(Snapshot snapshot)
296296
#endif
297297
}
298298

299-
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
299+
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin, int nParticipants)
300300
{
301301
int i;
302302
xid_t xid;
@@ -307,7 +307,7 @@ TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
307307
assert(snapshot != NULL);
308308

309309
// command
310-
if (!dtm_send_command(dtm, CMD_BEGIN, 0)) goto failure;
310+
if (!dtm_send_command(dtm, CMD_BEGIN, 1, nParticipants)) goto failure;
311311

312312
// results
313313
reslen = dtm_recv_results(dtm, RESULTS_SIZE, results);

contrib/multimaster/libdtm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ void DtmInitSnapshot(Snapshot snapshot);
2222
* smallest xmin among all snapshots known to arbiter. Returns INVALID_XID
2323
* otherwise.
2424
*/
25-
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin);
25+
TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin, int nParticipants);
2626

2727
/**
2828
* Asks the arbiter for a fresh snapshot. Fills the 'snapshot' and 'gxmin' on

0 commit comments

Comments
 (0)