Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c22de58

Browse files
knizhnikkelvich
authored andcommitted
Fix MtmAdjustOldestXid
1 parent 990ccfc commit c22de58

File tree

3 files changed

+52
-30
lines changed

3 files changed

+52
-30
lines changed

multimaster.c

+13-3
Original file line numberDiff line numberDiff line change
@@ -517,9 +517,16 @@ MtmAdjustOldestXid(TransactionId xid)
517517
MTM_LOG2("%d: MtmAdjustOldestXid(%d): snapshot=%ld, csn=%ld, status=%d", MyProcPid, xid, ts != NULL ? ts->snapshot : 0, ts != NULL ? ts->csn : 0, ts != NULL ? ts->status : -1);
518518
Mtm->gcCount = 0;
519519

520+
//return FirstNormalTransactionId;
521+
520522
if (ts != NULL) {
521523
oldestSnapshot = ts->snapshot;
522-
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
524+
Assert(oldestSnapshot != INVALID_CSN);
525+
if (Mtm->nodes[MtmNodeId-1].oldestSnapshot < oldestSnapshot) {
526+
Mtm->nodes[MtmNodeId-1].oldestSnapshot = oldestSnapshot;
527+
} else {
528+
oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
529+
}
523530
for (i = 0; i < Mtm->nAllNodes; i++) {
524531
if (!BIT_CHECK(Mtm->disabledNodeMask, i)
525532
&& Mtm->nodes[i].oldestSnapshot < oldestSnapshot)
@@ -532,9 +539,11 @@ MtmAdjustOldestXid(TransactionId xid)
532539
for (ts = Mtm->transListHead;
533540
ts != NULL
534541
&& ts->csn < oldestSnapshot
535-
&& TransactionIdPrecedes(ts->xid, xid)
542+
&& TransactionIdPrecedes(ts->xid, xid);
543+
/*
536544
&& (ts->status == TRANSACTION_STATUS_COMMITTED ||
537545
ts->status == TRANSACTION_STATUS_ABORTED);
546+
*/
538547
prev = ts, ts = ts->next)
539548
{
540549
if (prev != NULL) {
@@ -547,9 +556,10 @@ MtmAdjustOldestXid(TransactionId xid)
547556
if (MtmUseDtm)
548557
{
549558
if (prev != NULL) {
559+
MTM_LOG1("%d: MtmAdjustOldestXid: oldestXid=%d, prev->xid=%d, prev->status=%d, prev->snapshot=%ld, ts->xid=%d, ts->status=%d, ts->snapshot=%ld, oldestSnapshot=%ld",
560+
MyProcPid, xid, prev->xid, prev->status, prev->snapshot, (ts ? ts->xid : 0), (ts ? ts->status : -1), (ts ? ts->snapshot : -1), oldestSnapshot);
550561
Mtm->transListHead = prev;
551562
Mtm->oldestXid = xid = prev->xid;
552-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, oldestSnapshot=%ld", MyProcPid, xid, oldestSnapshot);
553563
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
554564
xid = Mtm->oldestXid;
555565
}

pglogical_output.c

+29-24
Original file line numberDiff line numberDiff line change
@@ -384,30 +384,31 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
384384
send_replication_origin &= txn->origin_id != InvalidRepOriginId;
385385

386386
OutputPluginPrepareWrite(ctx, !send_replication_origin);
387-
data->api->write_begin(ctx->out, data, txn);
387+
if (data->api) {
388+
data->api->write_begin(ctx->out, data, txn);
388389

389-
if (send_replication_origin)
390-
{
391-
char *origin;
392-
393-
/* Message boundary */
394-
OutputPluginWrite(ctx, false);
395-
OutputPluginPrepareWrite(ctx, true);
396-
397-
/*
398-
* XXX: which behaviour we want here?
399-
*
400-
* Alternatives:
401-
* - don't send origin message if origin name not found
402-
* (that's what we do now)
403-
* - throw error - that will break replication, not good
404-
* - send some special "unknown" origin
405-
*/
406-
if (data->api->write_origin &&
407-
replorigin_by_oid(txn->origin_id, true, &origin))
390+
if (send_replication_origin)
391+
{
392+
char *origin;
393+
394+
/* Message boundary */
395+
OutputPluginWrite(ctx, false);
396+
OutputPluginPrepareWrite(ctx, true);
397+
398+
/*
399+
* XXX: which behaviour we want here?
400+
*
401+
* Alternatives:
402+
* - don't send origin message if origin name not found
403+
* (that's what we do now)
404+
* - throw error - that will break replication, not good
405+
* - send some special "unknown" origin
406+
*/
407+
if (data->api->write_origin &&
408+
replorigin_by_oid(txn->origin_id, true, &origin))
408409
data->api->write_origin(ctx->out, origin, txn->origin_lsn);
410+
}
409411
}
410-
411412
OutputPluginWrite(ctx, true);
412413
}
413414

@@ -421,7 +422,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421422
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
422423

423424
OutputPluginPrepareWrite(ctx, true);
424-
data->api->write_commit(ctx->out, data, txn, commit_lsn);
425+
if (data->api) {
426+
data->api->write_commit(ctx->out, data, txn, commit_lsn);
427+
}
425428
OutputPluginWrite(ctx, true);
426429
}
427430

@@ -433,7 +436,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
433436
MemoryContext old;
434437

435438
/* First check the table filter */
436-
if (!call_row_filter_hook(data, txn, relation, change))
439+
if (!call_row_filter_hook(data, txn, relation, change) || data->api == NULL)
437440
return;
438441

439442
/* Avoid leaking memory by using and resetting our own context */
@@ -539,7 +542,9 @@ send_startup_message(LogicalDecodingContext *ctx,
539542
*/
540543

541544
OutputPluginPrepareWrite(ctx, last_message);
542-
data->api->write_startup_message(ctx->out, msg);
545+
if (data->api) {
546+
data->api->write_startup_message(ctx->out, msg);
547+
}
543548
OutputPluginWrite(ctx, last_message);
544549

545550
pfree(msg);

tests2/lib/bank_client.py

+10-3
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,13 @@ def check_total(self):
153153
def tx(conn, cur):
154154
cur.execute('select sum(amount) from bank_test')
155155
res = cur.fetchone()
156+
total = res[0]
157+
if total != 0:
158+
cur.execute('select mtm.get_snapshot()')
159+
res = cur.fetchone()
160+
print("Isolation error, total = %d, node = %d, snapshot = %d" % (total,self.node_id,res[0]))
161+
#raise BaseException
156162
conn.commit()
157-
if res[0] != 0:
158-
print("Isolation error, total = %d, node = %d" % (res[0],self.node_id))
159-
raise BaseException
160163

161164
self.exec_tx('total', tx)
162165

@@ -175,10 +178,14 @@ def tx(conn, cur):
175178
set amount = amount - %s
176179
where uid = %s''',
177180
(amount, from_uid))
181+
if (cur.rowcount != 1):
182+
raise BaseException
178183
cur.execute('''update bank_test
179184
set amount = amount + %s
180185
where uid = %s''',
181186
(amount, to_uid))
187+
if (cur.rowcount != 1):
188+
raise BaseException
182189
conn.commit()
183190

184191
self.exec_tx('transfer', tx)

0 commit comments

Comments
 (0)