Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9a51ff8

Browse files
knizhnikkelvich
authored andcommitted
Fix problem with mtm.local_tables
1 parent 241b04f commit 9a51ff8

File tree

2 files changed

+59
-58
lines changed

2 files changed

+59
-58
lines changed

multimaster.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3343,8 +3343,10 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
33433343

33443344
if (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1))
33453345
{
3346-
/* Ok, then start recovery by luckiest walreceiver */
3347-
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3346+
/* Ok, then start recovery by luckiest walreceiver (if there is no donor node).
3347+
* If this node was populated using basebackup, then donorNodeId is not zero and we should choose this node for recovery */
3348+
if ((Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId)
3349+
&& (Mtm->donorNodeId == MtmNodeId || Mtm->donorNodeId == nodeId))
33483350
{
33493351
/* Lock on us */
33503352
Mtm->recoverySlot = nodeId;
@@ -4261,8 +4263,8 @@ Datum mtm_make_table_local(PG_FUNCTION_ARGS)
42614263
/* Form a tuple. */
42624264
memset(nulls, false, sizeof(nulls));
42634265

4264-
values[Anum_mtm_local_tables_rel_schema - 1] = CStringGetTextDatum(schemaName);
4265-
values[Anum_mtm_local_tables_rel_name - 1] = CStringGetTextDatum(tableName);
4266+
values[Anum_mtm_local_tables_rel_schema - 1] = CStringGetDatum(schemaName);
4267+
values[Anum_mtm_local_tables_rel_name - 1] = CStringGetDatum(tableName);
42664268

42674269
tup = heap_form_tuple(tupDesc, values, nulls);
42684270

pglogical_apply.c

Lines changed: 53 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -344,16 +344,16 @@ process_remote_begin(StringInfo s)
344344
nodemask_t participantsMask;
345345
int rc;
346346

347-
gtid.node = pq_getmsgint(s, 4);
348-
gtid.xid = pq_getmsgint64(s);
349-
snapshot = pq_getmsgint64(s);
347+
gtid.node = pq_getmsgint(s, 4);
348+
gtid.xid = pq_getmsgint64(s);
349+
snapshot = pq_getmsgint64(s);
350350
participantsMask = pq_getmsgint64(s);
351351
Assert(gtid.node > 0);
352352

353353
MTM_LOG2("REMOTE begin node=%d xid=%llu snapshot=%lld participantsMask=%llx", gtid.node, (long64)gtid.xid, snapshot, participantsMask);
354-
MtmResetTransaction();
354+
MtmResetTransaction();
355355

356-
SetCurrentStatementStartTimestamp();
356+
SetCurrentStatementStartTimestamp();
357357
StartTransactionCommand();
358358
MtmJoinTransaction(&gtid, snapshot, participantsMask);
359359

@@ -362,7 +362,7 @@ process_remote_begin(StringInfo s)
362362
GucAltered = false;
363363
rc = SPI_execute("RESET SESSION AUTHORIZATION; reset all;", false, 0);
364364
SPI_finish();
365-
if (rc < 0) {
365+
if (rc < 0) {
366366
MTM_ELOG(ERROR, "Failed to set reset context: %d", rc);
367367
}
368368
}
@@ -403,13 +403,13 @@ process_remote_message(StringInfo s)
403403

404404
rc = SPI_execute(messageBody, false, 0);
405405
SPI_finish();
406-
if (rc < 0) {
406+
if (rc < 0) {
407407
MTM_ELOG(ERROR, "Failed to execute utility statement %s", messageBody);
408-
} else {
408+
} else {
409409
MemoryContextSwitchTo(MtmApplyContext);
410410
PushActiveSnapshot(GetTransactionSnapshot());
411411

412-
if (MtmVacuumStmt != NULL) {
412+
if (MtmVacuumStmt != NULL) {
413413
ExecVacuum(MtmVacuumStmt, 1);
414414
} else if (MtmIndexStmt != NULL) {
415415
Oid relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
@@ -426,7 +426,7 @@ process_remote_message(StringInfo s)
426426
true, /* check_rights */
427427
false, /* skip_build */
428428
false); /* quiet */
429-
429+
430430
}
431431
else if (MtmDropStmt != NULL)
432432
{
@@ -449,7 +449,7 @@ process_remote_message(StringInfo s)
449449
if (ActiveSnapshotSet())
450450
PopActiveSnapshot();
451451
}
452-
if (standalone) {
452+
if (standalone) {
453453
CommitTransactionCommand();
454454
}
455455
break;
@@ -462,18 +462,18 @@ process_remote_message(StringInfo s)
462462
/* This function is called directly by receiver, so there is no race condition and we can update
463463
* restartLSN without locks
464464
*/
465-
if (origin_node == MtmReplicationNodeId) {
465+
if (origin_node == MtmReplicationNodeId) {
466466
Assert(msg->origin_lsn == INVALID_LSN);
467467
msg->origin_lsn = MtmSenderWalEnd;
468468
}
469-
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
469+
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
470470
MTM_LOG1("Receive logical abort message for transaction %s from node %d: %llx < %llx", msg->gid, origin_node, Mtm->nodes[origin_node-1].restartLSN, msg->origin_lsn);
471471
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
472-
replorigin_session_origin_lsn = msg->origin_lsn;
472+
replorigin_session_origin_lsn = msg->origin_lsn;
473473
MtmRollbackPreparedTransaction(origin_node, msg->gid);
474-
} else {
475-
if (msg->origin_lsn != INVALID_LSN) {
476-
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx",
474+
} else {
475+
if (msg->origin_lsn != INVALID_LSN) {
476+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx",
477477
msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
478478
}
479479
}
@@ -498,7 +498,7 @@ process_remote_message(StringInfo s)
498498
}
499499
return standalone;
500500
}
501-
501+
502502
static void
503503
read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
504504
{
@@ -529,7 +529,7 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
529529
const char *data;
530530
int len;
531531

532-
if (att->atttypid == InvalidOid) {
532+
if (att->atttypid == InvalidOid) {
533533
continue;
534534
}
535535

@@ -612,13 +612,13 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
612612
static void
613613
close_rel(Relation rel)
614614
{
615-
if (rel != NULL)
615+
if (rel != NULL)
616616
{
617-
heap_close(rel, NoLock);
618-
}
617+
heap_close(rel, NoLock);
618+
}
619619
}
620620

621-
static Relation
621+
static Relation
622622
read_rel(StringInfo s, LOCKMODE mode)
623623
{
624624
int relnamelen;
@@ -629,20 +629,20 @@ read_rel(StringInfo s, LOCKMODE mode)
629629
MemoryContext old_context;
630630

631631
local_relid = pglogical_relid_map_get(remote_relid);
632-
if (local_relid == InvalidOid) {
632+
if (local_relid == InvalidOid) {
633633
rv = makeNode(RangeVar);
634634

635635
nspnamelen = pq_getmsgbyte(s);
636636
rv->schemaname = (char *) pq_getmsgbytes(s, nspnamelen);
637-
637+
638638
relnamelen = pq_getmsgbyte(s);
639639
rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
640-
640+
641641
local_relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);
642642
old_context = MemoryContextSwitchTo(TopMemoryContext);
643643
pglogical_relid_map_put(remote_relid, local_relid);
644644
MemoryContextSwitchTo(old_context);
645-
} else {
645+
} else {
646646
nspnamelen = pq_getmsgbyte(s);
647647
s->cursor += nspnamelen;
648648
relnamelen = pq_getmsgbyte(s);
@@ -707,29 +707,29 @@ process_remote_commit(StringInfo in)
707707
Assert(IsTransactionState() && TransactionIdIsValid(MtmGetCurrentTransactionId()));
708708
strncpy(gid, pq_getmsgstring(in), sizeof gid);
709709
MTM_LOG2("%d: PGLOGICAL_PREPARE %s, (%llx,%llx,%llx)", MyProcPid, gid, commit_lsn, end_lsn, origin_lsn);
710-
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
711-
MTM_LOG1("Avoid prepare of previously aborted global transaction %s", gid);
710+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
711+
MTM_LOG1("Avoid prepare of previously aborted global transaction %s", gid);
712712
AbortCurrentTransaction();
713-
} else {
713+
} else {
714714
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
715715
BeginTransactionBlock(false);
716716
CommitTransactionCommand();
717717
StartTransactionCommand();
718-
718+
719719
MtmBeginSession(origin_node);
720720
/* PREPARE itself */
721721
MtmSetCurrentTransactionGID(gid);
722722
PrepareTransactionBlock(gid);
723723
CommitTransactionCommand();
724724

725-
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_UNKNOWN) == TRANSACTION_STATUS_ABORTED) {
726-
MTM_LOG1("Perform delayed rollback of prepared global transaction %s", gid);
725+
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_UNKNOWN) == TRANSACTION_STATUS_ABORTED) {
726+
MTM_LOG1("Perform delayed rollback of prepared global transaction %s", gid);
727727
StartTransactionCommand();
728728
MtmSetCurrentTransactionGID(gid);
729729
FinishPreparedTransaction(gid, false);
730-
CommitTransactionCommand();
730+
CommitTransactionCommand();
731731
Assert(!MtmTransIsActive());
732-
}
732+
}
733733
MtmEndSession(origin_node, true);
734734
}
735735
break;
@@ -771,7 +771,7 @@ process_remote_commit(StringInfo in)
771771
default:
772772
Assert(false);
773773
}
774-
if (Mtm->status == MTM_RECOVERY) {
774+
if (Mtm->status == MTM_RECOVERY) {
775775
MTM_LOG1("Recover transaction %s event=%d", gid, event);
776776
}
777777
MtmUpdateLsnMapping(MtmReplicationNodeId, end_lsn);
@@ -871,12 +871,12 @@ process_remote_insert(StringInfo s, Relation rel)
871871
if (strcmp(RelationGetRelationName(rel), MULTIMASTER_LOCAL_TABLES_TABLE) == 0 &&
872872
strcmp(get_namespace_name(RelationGetNamespace(rel)), MULTIMASTER_SCHEMA_NAME) == 0)
873873
{
874-
MtmMakeTableLocal(TextDatumGetCString(new_tuple.values[0]), TextDatumGetCString(new_tuple.values[1]));
874+
MtmMakeTableLocal((char*)DatumGetPointer(new_tuple.values[0]), (char*)DatumGetPointer(new_tuple.values[1]));
875875
}
876-
876+
877877
ExecResetTupleTable(estate->es_tupleTable, true);
878878
FreeExecutorState(estate);
879-
879+
880880
CommandCounterIncrement();
881881
}
882882

@@ -989,12 +989,12 @@ process_remote_update(StringInfo s, Relation rel)
989989
errdetail("Most likely we have DELETE-UPDATE conflict")));
990990

991991
}
992-
992+
993993
PopActiveSnapshot();
994-
994+
995995
/* release locks upon commit */
996996
index_close(idxrel, NoLock);
997-
997+
998998
ExecResetTupleTable(estate->es_tupleTable, true);
999999
FreeExecutorState(estate);
10001000

@@ -1089,7 +1089,7 @@ void MtmExecutor(void* work, size_t size)
10891089
s.len = size;
10901090
s.maxlen = -1;
10911091
s.cursor = 0;
1092-
1092+
10931093
if (MtmApplyContext == NULL) {
10941094
MtmApplyContext = AllocSetContextCreate(TopMemoryContext,
10951095
"ApplyContext",
@@ -1100,15 +1100,15 @@ void MtmExecutor(void* work, size_t size)
11001100
top_context = MemoryContextSwitchTo(MtmApplyContext);
11011101
replorigin_session_origin = InvalidRepOriginId;
11021102
PG_TRY();
1103-
{
1103+
{
11041104
bool inside_transaction = true;
1105-
do {
1105+
do {
11061106
char action = pq_getmsgbyte(&s);
11071107
old_context = MemoryContextSwitchTo(MtmApplyContext);
1108-
1108+
11091109
MTM_LOG2("%d: REMOTE process action %c", MyProcPid, action);
11101110
#if 0
1111-
if (Mtm->status == MTM_RECOVERY) {
1111+
if (Mtm->status == MTM_RECOVERY) {
11121112
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);
11131113
}
11141114
#endif
@@ -1150,7 +1150,7 @@ void MtmExecutor(void* work, size_t size)
11501150
}
11511151
case '(':
11521152
{
1153-
size_t size = pq_getmsgint(&s, 4);
1153+
size_t size = pq_getmsgint(&s, 4);
11541154
s.data = MemoryContextAlloc(TopMemoryContext, size);
11551155
save_cursor = s.cursor;
11561156
save_len = s.len;
@@ -1175,10 +1175,10 @@ void MtmExecutor(void* work, size_t size)
11751175
relid = RelationGetRelid(rel);
11761176
close_rel(rel);
11771177
rel = NULL;
1178-
next = pq_getmsgint64(&s);
1178+
next = pq_getmsgint64(&s);
11791179
AdjustSequence(relid, next);
11801180
break;
1181-
}
1181+
}
11821182
case '0':
11831183
Assert(rel != NULL);
11841184
heap_truncate_one_rel(rel);
@@ -1198,7 +1198,7 @@ void MtmExecutor(void* work, size_t size)
11981198
}
11991199
default:
12001200
MTM_ELOG(ERROR, "unknown action of type %c", action);
1201-
}
1201+
}
12021202
MemoryContextSwitchTo(old_context);
12031203
MemoryContextResetAndDeleteChildren(MtmApplyContext);
12041204
} while (inside_transaction);
@@ -1217,16 +1217,15 @@ void MtmExecutor(void* work, size_t size)
12171217
MTM_LOG2("%d: REMOTE end abort transaction %llu", MyProcPid, (long64)MtmGetCurrentTransactionId());
12181218
}
12191219
PG_END_TRY();
1220-
if (s.data != work) {
1220+
if (s.data != work) {
12211221
pfree(s.data);
12221222
}
12231223
#if 0 /* spill file is expecrted to be closed by tranaction commit or rollback */
1224-
if (spill_file >= 0) {
1224+
if (spill_file >= 0) {
12251225
MtmCloseSpillFile(spill_file);
12261226
}
12271227
#endif
12281228
MemoryContextSwitchTo(top_context);
12291229
MemoryContextResetAndDeleteChildren(MtmApplyContext);
12301230
MtmReleaseLocks();
12311231
}
1232-

0 commit comments

Comments
 (0)