Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1b8929d

Browse files
committed
Add mmts control file
1 parent 46f63ec commit 1b8929d

File tree

3 files changed

+74
-8
lines changed

3 files changed

+74
-8
lines changed

contrib/mmts/multimaster.c

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,7 @@ bool MtmVolksWagenMode;
214214
TransactionId MtmUtilityProcessedInXid;
215215

216216
static char* MtmConnStrs;
217+
static char* MtmClusterName;
217218
static int MtmQueueSize;
218219
static int MtmWorkers;
219220
static int MtmVacuumDelay;
@@ -1867,6 +1868,39 @@ static void MtmRaftableInitialize()
18671868
raftable_start(MtmNodeId - 1);
18681869
}
18691870

1871+
static void MtmCheckControlFile(void)
1872+
{
1873+
char controlFilePath[MAXPGPATH];
1874+
char buf[MULTIMASTER_MAX_CTL_STR_SIZE];
1875+
FILE* f;
1876+
snprintf(controlFilePath, MAXPGPATH, "%s/global/mmts_control", DataDir);
1877+
f = fopen(controlFilePath, "r");
1878+
if (f != NULL && fgets(buf, sizeof buf, f)) {
1879+
char* sep = strchr(buf, ':');
1880+
if (sep == NULL) {
1881+
elog(FATAL, "File mmts_control doesn't contain cluster name");
1882+
}
1883+
*sep = '\0';
1884+
if (strcmp(buf, MtmClusterName) != 0) {
1885+
elog(FATAL, "Database belongs to some other cluster %s rather than %s", buf, MtmClusterName);
1886+
}
1887+
if (sscanf(sep+1, "%d", &Mtm->donorNodeId) != 1) {
1888+
elog(FATAL, "File mmts_control doesn't contain node id");
1889+
}
1890+
fclose(f);
1891+
} else {
1892+
if (f != NULL) {
1893+
fclose(f);
1894+
}
1895+
f = fopen(controlFilePath, "w");
1896+
if (f == NULL) {
1897+
elog(FATAL, "Failed to create mmts_control file: %m");
1898+
}
1899+
Mtm->donorNodeId = -1;
1900+
fprintf(f, "%s:%d\n", MtmClusterName, Mtm->donorNodeId);
1901+
fclose(f);
1902+
}
1903+
}
18701904

18711905
static void MtmInitialize()
18721906
{
@@ -1931,6 +1965,8 @@ static void MtmInitialize()
19311965
MtmDoReplication = true;
19321966
TM = &MtmTM;
19331967
LWLockRelease(AddinShmemInitLock);
1968+
1969+
MtmCheckControlFile();
19341970
}
19351971

19361972
static void
@@ -2472,6 +2508,19 @@ _PG_init(void)
24722508
NULL /* GucShowHook show_hook */
24732509
);
24742510

2511+
DefineCustomStringVariable(
2512+
"multimaster.cluster_name",
2513+
"Name of the cluster",
2514+
NULL,
2515+
&MtmClusterName,
2516+
"mmts",
2517+
PGC_BACKEND, /* context */
2518+
0, /* flags */
2519+
NULL, /* GucStringCheckHook check_hook */
2520+
NULL, /* GucStringAssignHook assign_hook */
2521+
NULL /* GucShowHook show_hook */
2522+
);
2523+
24752524
DefineCustomIntVariable(
24762525
"multimaster.node_id",
24772526
"Multimaster node ID",
@@ -2609,8 +2658,10 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
26092658
MtmLock(LW_EXCLUSIVE);
26102659
if (Mtm->status == MTM_RECOVERY) {
26112660
recovery = true;
2612-
if (Mtm->recoverySlot == 0 || Mtm->recoverySlot == nodeId) {
2613-
/* Choose for recovery first available slot */
2661+
if ((Mtm->recoverySlot == 0 && (Mtm->donorNodeId < 0 || Mtm->donorNodeId == nodeId))
2662+
|| Mtm->recoverySlot == nodeId)
2663+
{
2664+
/* Choose for recovery first available slot or slot of donor node (if any) */
26142665
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
26152666
Mtm->recoverySlot = nodeId;
26162667
Mtm->nReceivers = 0;
@@ -2698,6 +2749,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
26982749
{
26992750
ListCell *param;
27002751
bool recoveryCompleted = false;
2752+
XLogRecPtr recoveryStartPos = InvalidXLogRecPtr;
2753+
27012754
MtmIsRecoverySession = false;
27022755
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
27032756
Mtm->nodes[MtmReplicationNodeId-1].senderStartTime = MtmGetSystemTime();
@@ -2717,11 +2770,21 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
27172770
elog(ERROR, "Replication mode is not specified");
27182771
}
27192772
break;
2773+
} else if (strcmp("mtm_restart_pos", elem->defname) == 0) {
2774+
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
2775+
recoveryStartPos = intVal(elem->arg);
2776+
} else {
2777+
elog(ERROR, "Restart position is not specified");
2778+
}
27202779
}
27212780
}
27222781
MtmLock(LW_EXCLUSIVE);
2723-
if (MtmIsRecoverySession) {
2724-
MTM_LOG1("%d: Node %d start recovery of node %d", MyProcPid, MtmNodeId, MtmReplicationNodeId);
2782+
if (MtmIsRecoverySession) {
2783+
MTM_LOG1("%d: Node %d start recovery of node %d at position %lx", MyProcPid, MtmNodeId, MtmReplicationNodeId, recoveryStartPos);
2784+
Assert(MyReplicationSlot != NULL);
2785+
if (recoveryStartPos < MyReplicationSlot->data.restart_lsn) {
2786+
elog(ERROR, "Specified recovery start position %lx is beyond restart lsn %lx", recoveryStartPos, MyReplicationSlot->data.restart_lsn);
2787+
}
27252788
if (!BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
27262789
MtmDisableNode(MtmReplicationNodeId);
27272790
MtmCheckQuorum();

contrib/mmts/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
5353
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
5454
#define MULTIMASTER_MAX_LOCAL_TABLES 256
55+
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
5556
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
5657
#define MULTIMASTER_ADMIN "mtm_admin"
5758

@@ -241,6 +242,7 @@ typedef struct
241242
int nActiveTransactions; /* Nunmber of active 2PC transactions */
242243
int nConfigChanges; /* Number of cluster configuration changes */
243244
int recoveryCount; /* Number of completed recoveries */
245+
int donorNodeId; /* Cluster node from which this node was populated */
244246
int64 timeShift; /* Local time correction */
245247
csn_t csn; /* Last obtained timestamp: used to provide unique acending CSNs based on system time */
246248
csn_t lastCsn; /* CSN of last committed transaction */

contrib/mmts/pglogical_receiver.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ pglogical_receiver_main(Datum main_arg)
285285
timeline = Mtm->nodes[nodeId-1].timeline;
286286
newTimeline = true;
287287
}
288-
/* My original assumption was that we can perfrom recovery only fromm existed slot,
288+
/* My original assumption was that we can perfrom recovery only from existed slot,
289289
* but unfortunately looks like slots can "disapear" together with WAL-sender.
290290
* So let's try to recreate slot always. */
291291
/* if (mode != REPLMODE_REPLICATION) */
@@ -325,7 +325,7 @@ pglogical_receiver_main(Datum main_arg)
325325
* Them are either empty, either new node is synchronized using base_backup.
326326
* So we assume that LSNs are the same for local and remote node
327327
*/
328-
originStartPos = Mtm->status == MTM_RECOVERY ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
328+
originStartPos = Mtm->status == MTM_RECOVERY && Mtm->donorNodeId == nodeId ? GetXLogInsertRecPtr() : InvalidXLogRecPtr;
329329
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
330330
} else {
331331
originStartPos = replorigin_get_progress(originId, false);
@@ -335,13 +335,14 @@ pglogical_receiver_main(Datum main_arg)
335335
CommitTransactionCommand();
336336
}
337337

338-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s')",
338+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%lx')",
339339
slotName,
340340
(uint32) (originStartPos >> 32),
341341
(uint32) originStartPos,
342342
MULTIMASTER_MAX_PROTO_VERSION,
343343
MULTIMASTER_MIN_PROTO_VERSION,
344-
MtmReplicationModeName[mode]
344+
MtmReplicationModeName[mode],
345+
originStartPos
345346
);
346347
res = PQexec(conn, query->data);
347348
if (PQresultStatus(res) != PGRES_COPY_BOTH)

0 commit comments

Comments
 (0)