Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d26d915

Browse files
committed
Merge branch 'master' into more_tests
2 parents 2dce397 + bbefe45 commit d26d915

File tree

4 files changed

+48
-29
lines changed

4 files changed

+48
-29
lines changed

contrib/mmts/multimaster.c

+27-9
Original file line numberDiff line numberDiff line change
@@ -1531,8 +1531,8 @@ static void MtmEnableNode(int nodeId)
15311531
void MtmRecoveryCompleted(void)
15321532
{
15331533
int i;
1534-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, live nodes=%d",
1535-
MtmNodeId, (long long) Mtm->disabledNodeMask, (long long) Mtm->connectivityMask, Mtm->nLiveNodes);
1534+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, endLSN=%lx, live nodes=%d",
1535+
MtmNodeId, (long long) Mtm->disabledNodeMask, (long long) Mtm->connectivityMask, GetXLogInsertRecPtr(), Mtm->nLiveNodes);
15361536
MtmLock(LW_EXCLUSIVE);
15371537
Mtm->recoverySlot = 0;
15381538
Mtm->recoveredLSN = GetXLogInsertRecPtr();
@@ -1542,7 +1542,7 @@ void MtmRecoveryCompleted(void)
15421542
for (i = 0; i < Mtm->nAllNodes; i++) {
15431543
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
15441544
}
1545-
/* Mode will be changed to online once all logical reciever are connected */
1545+
/* Mode will be changed to online once all logical receiver are connected */
15461546
MtmSwitchClusterMode(MTM_CONNECTED);
15471547
MtmUnlock();
15481548
}
@@ -2131,7 +2131,6 @@ static void MtmInitialize()
21312131
Mtm->nodes[i].restartLSN = InvalidXLogRecPtr;
21322132
Mtm->nodes[i].originId = InvalidRepOriginId;
21332133
Mtm->nodes[i].timeline = 0;
2134-
Mtm->nodes[i].recoveredLSN = InvalidXLogRecPtr;
21352134
}
21362135
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
21372136
/* All transaction originated from the current node should be ignored during recovery */
@@ -2884,13 +2883,14 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
28842883
{
28852884
MtmReplicationMode mode = REPLMODE_OPEN_EXISTED;
28862885

2886+
MtmLock(LW_EXCLUSIVE);
28872887
while ((Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) || BIT_CHECK(Mtm->disabledNodeMask, nodeId-1))
28882888
{
28892889
if (*shutdown)
28902890
{
2891+
MtmUnlock();
28912892
return REPLMODE_EXIT;
28922893
}
2893-
MtmLock(LW_EXCLUSIVE);
28942894
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
28952895
mode = REPLMODE_CREATE_NEW;
28962896
}
@@ -2913,6 +2913,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29132913
MtmUnlock();
29142914
/* delay opening of other slots until recovery is completed */
29152915
MtmSleep(STATUS_POLL_DELAY);
2916+
MtmLock(LW_EXCLUSIVE);
29162917
}
29172918
if (mode == REPLMODE_RECOVERED) {
29182919
MTM_LOG1("%d: Restart replication from node %d after end of recovery", MyProcPid, nodeId);
@@ -2921,6 +2922,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29212922
} else {
29222923
MTM_LOG1("%d: Continue replication from node %d", MyProcPid, nodeId);
29232924
}
2925+
MtmUnlock();
29242926
return mode;
29252927
}
29262928

@@ -3014,7 +3016,12 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30143016
}
30153017
} else if (strcmp("mtm_recovered_pos", elem->defname) == 0) {
30163018
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
3017-
sscanf(strVal(elem->arg), "%lx", &Mtm->nodes[MtmReplicationNodeId-1].recoveredLSN);
3019+
XLogRecPtr recoveredLSN;
3020+
sscanf(strVal(elem->arg), "%lx", &recoveredLSN);
3021+
MTM_LOG1("Recovered position of node %d is %lx", MtmReplicationNodeId, recoveredLSN);
3022+
if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
3023+
Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
3024+
}
30183025
} else {
30193026
elog(ERROR, "Recovered position is not specified");
30203027
}
@@ -3129,16 +3136,21 @@ MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
31293136
return isDistributed;
31303137
}
31313138

3139+
/*
3140+
* Filter received transacyions at destination side.
3141+
* This function is executed by receiver, so there are no race conditions and it is possible to update nodes[i].restaetLSN without lock
3142+
*/
31323143
bool MtmFilterTransaction(char* record, int size)
31333144
{
31343145
StringInfoData s;
31353146
uint8 flags;
31363147
XLogRecPtr origin_lsn;
31373148
XLogRecPtr end_lsn;
3149+
XLogRecPtr restart_lsn;
31383150
int replication_node;
31393151
int origin_node;
31403152
char const* gid = "";
3141-
bool duplicate;
3153+
bool duplicate = false;
31423154

31433155
s.data = record;
31443156
s.len = size;
@@ -3174,11 +3186,17 @@ bool MtmFilterTransaction(char* record, int size)
31743186
default:
31753187
break;
31763188
}
3189+
restart_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
3190+
if (Mtm->nodes[origin_node-1].restartLSN < restart_lsn) {
3191+
Mtm->nodes[origin_node-1].restartLSN = restart_lsn;
3192+
} else {
3193+
duplicate = true;
3194+
}
3195+
31773196
//duplicate = Mtm->status == MTM_RECOVERY && origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
3178-
duplicate = origin_lsn != InvalidXLogRecPtr && origin_lsn <= Mtm->nodes[origin_node-1].restartLSN;
31793197

31803198
MTM_LOG1("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3181-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
3199+
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
31823200
return duplicate;
31833201
}
31843202

contrib/mmts/multimaster.h

-1
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ typedef struct
204204
void* lockGraphData;
205205
int lockGraphAllocated;
206206
int lockGraphUsed;
207-
XLogRecPtr recoveredLSN;
208207
} MtmNodeInfo;
209208

210209
typedef struct MtmTransState

contrib/mmts/pglogical_apply.c

+8-5
Original file line numberDiff line numberDiff line change
@@ -430,11 +430,17 @@ process_remote_message(StringInfo s)
430430
MtmAbortLogicalMessage* msg = (MtmAbortLogicalMessage*)messageBody;
431431
int origin_node = msg->origin_node;
432432
Assert(messageSize == sizeof(MtmAbortLogicalMessage));
433+
/* This function is called directly by receiver, so there is no race condition and we can update
434+
* restartLSN without locks
435+
*/
433436
if (Mtm->nodes[origin_node-1].restartLSN < msg->origin_lsn) {
434437
Mtm->nodes[origin_node-1].restartLSN = msg->origin_lsn;
438+
replorigin_session_origin_lsn = msg->origin_lsn;
439+
MtmRollbackPreparedTransaction(origin_node, msg->gid);
440+
} else {
441+
MTM_LOG1("Ignore rollback of transaction %s from node %d because it's LSN %lx <= %lx",
442+
msg->gid, origin_node, msg->origin_lsn, Mtm->nodes[origin_node-1].restartLSN);
435443
}
436-
replorigin_session_origin_lsn = msg->origin_lsn;
437-
MtmRollbackPreparedTransaction(origin_node, msg->gid);
438444
standalone = true;
439445
break;
440446
}
@@ -611,9 +617,6 @@ process_remote_commit(StringInfo in)
611617
origin_lsn = pq_getmsgint64(in);
612618

613619
replorigin_session_origin_lsn = origin_node == MtmReplicationNodeId ? end_lsn : origin_lsn;
614-
if (Mtm->nodes[origin_node-1].restartLSN < replorigin_session_origin_lsn) {
615-
Mtm->nodes[origin_node-1].restartLSN = replorigin_session_origin_lsn;
616-
}
617620
Assert(replorigin_session_origin == InvalidRepOriginId);
618621

619622
switch(PGLOGICAL_XACT_EVENT(flags))

contrib/mmts/pglogical_receiver.c

+13-14
Original file line numberDiff line numberDiff line change
@@ -260,9 +260,8 @@ pglogical_receiver_main(Datum main_arg)
260260
{
261261
int count;
262262
ConnStatusType status;
263-
XLogRecPtr originStartPos = InvalidXLogRecPtr;
263+
XLogRecPtr originStartPos = Mtm->nodes[nodeId-1].restartLSN;
264264
int timeline;
265-
bool newTimeline = false;
266265

267266
/*
268267
* Determine when and how we should open replication slot.
@@ -291,12 +290,12 @@ pglogical_receiver_main(Datum main_arg)
291290
if ((mode == REPLMODE_OPEN_EXISTED && timeline != Mtm->nodes[nodeId-1].timeline)
292291
|| mode == REPLMODE_CREATE_NEW)
293292
{ /* recreate slot */
293+
elog(LOG, "Recreate replication slot %s", slotName);
294294
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
295295
res = PQexec(conn, query->data);
296296
PQclear(res);
297297
resetPQExpBuffer(query);
298298
timeline = Mtm->nodes[nodeId-1].timeline;
299-
newTimeline = true;
300299
}
301300
/* My original assumption was that we can perfrom recovery only from existed slot,
302301
* but unfortunately looks like slots can "disapear" together with WAL-sender.
@@ -322,11 +321,7 @@ pglogical_receiver_main(Datum main_arg)
322321
}
323322

324323
/* Start logical replication at specified position */
325-
if (mode == REPLMODE_RECOVERED) {
326-
originStartPos = Mtm->nodes[nodeId-1].restartLSN;
327-
MTM_LOG1("Restart replication from node %d from position %lx", nodeId, originStartPos);
328-
}
329-
if (originStartPos == InvalidXLogRecPtr && !newTimeline) {
324+
if (originStartPos == InvalidXLogRecPtr) {
330325
StartTransactionCommand();
331326
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
332327
originId = replorigin_by_name(originName, true);
@@ -349,10 +344,11 @@ pglogical_receiver_main(Datum main_arg)
349344
}
350345
Mtm->nodes[nodeId-1].originId = originId;
351346
CommitTransactionCommand();
352-
} else if (mode == REPLMODE_CREATE_NEW) {
353-
originStartPos = Mtm->nodes[nodeId-1].recoveredLSN;
354-
}
347+
}
355348

349+
MTM_LOG1("Start replication on slot %s from node %d at position %lx, mode %s, recovered lsn %lx",
350+
slotName, nodeId, originStartPos, MtmReplicationModeName[mode], Mtm->recoveredLSN);
351+
356352
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx', \"mtm_recovered_pos\" '%lx')",
357353
slotName,
358354
(uint32) (originStartPos >> 32),
@@ -409,13 +405,16 @@ pglogical_receiver_main(Datum main_arg)
409405
ereport(LOG, (errmsg("%s: restart WAL receiver because node was switched to %s mode", worker_proc, MtmNodeStatusMnem[Mtm->status])));
410406
break;
411407
}
412-
if (count != Mtm->recoveryCount) {
413-
408+
if (count != Mtm->recoveryCount) {
414409
ereport(LOG, (errmsg("%s: restart WAL receiver because node was recovered", worker_proc)));
415410
break;
416411
}
417412

418-
413+
if (timeline != Mtm->nodes[nodeId-1].timeline) {
414+
ereport(LOG, (errmsg("%s: restart WAL receiver because node %d timeline is changed", worker_proc, nodeId)));
415+
break;
416+
}
417+
419418
/*
420419
* Receive data.
421420
*/

0 commit comments

Comments
 (0)