Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f051d35

Browse files
knizhnikkelvich
authored andcommitted
Add recover_node function
1 parent e0105f6 commit f051d35

File tree

8 files changed

+43
-9
lines changed

8 files changed

+43
-9
lines changed

arbiter.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,15 +119,15 @@ static BackgroundWorker MtmSender = {
119119
"mtm-sender",
120120
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, /* do not need connection to the database */
121121
BgWorkerStart_ConsistentState,
122-
1, /* restart in one second (is it possible to restart immediately?) */
122+
MULTIMASTER_BGW_RESTART_TIMEOUT,
123123
MtmTransSender
124124
};
125125

126126
static BackgroundWorker MtmRecevier = {
127127
"mtm-receiver",
128128
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION, /* do not need connection to the database */
129129
BgWorkerStart_ConsistentState,
130-
1, /* restart in one second (is it possible to restart immediately?) */
130+
MULTIMASTER_BGW_RESTART_TIMEOUT,
131131
MtmTransReceiver
132132
};
133133

@@ -297,6 +297,7 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
297297

298298
/* Some node considered that I am dead, so switch to recovery mode */
299299
if (BIT_CHECK(msg.disabledNodeMask, MtmNodeId-1)) {
300+
elog(WARNING, "Node %d think that I am dead", msg.node);
300301
MtmSwitchClusterMode(MTM_RECOVERY);
301302
}
302303
/* Combine disable masks from all node. Is it actually correct or we should better check availability of nodes ourselves? */

bgwpool.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
8585
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
8686
worker.bgw_start_time = BgWorkerStart_ConsistentState;
8787
worker.bgw_main = BgwPoolMainLoop;
88-
worker.bgw_restart_time = 10; /* Wait 10 seconds for restart before crash */
88+
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
8989

9090
for (i = 0; i < nWorkers; i++) {
9191
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));

bgwpool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
99

1010
#define MAX_DBNAME_LEN 30
11+
#define MULTIMASTER_BGW_RESTART_TIMEOUT 10 /* seconds */
1112

1213
typedef struct
1314
{

multimaster--1.0.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16+
-- Create replication slot for the node which was previously dropped together with it's slot
17+
CREATE FUNCTION mtm.recover_node(node integer) RETURNS void
18+
AS 'MODULE_PATHNAME','mtm_recover_node'
19+
LANGUAGE C;
20+
21+
1622
CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
1723
AS 'MODULE_PATHNAME','mtm_get_snapshot'
1824
LANGUAGE C;

multimaster.c

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ PG_MODULE_MAGIC;
8989
PG_FUNCTION_INFO_V1(mtm_start_replication);
9090
PG_FUNCTION_INFO_V1(mtm_stop_replication);
9191
PG_FUNCTION_INFO_V1(mtm_drop_node);
92+
PG_FUNCTION_INFO_V1(mtm_recover_node);
9293
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
9394

9495
static Snapshot MtmGetSnapshot(Snapshot snapshot);
@@ -1182,6 +1183,22 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
11821183
return dtm->recoverySlot ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
11831184
}
11841185

1186+
void MtmRecoverNode(int nodeId)
1187+
{
1188+
if (nodeId <= 0 || nodeId > dtm->nNodes)
1189+
{
1190+
elog(ERROR, "NodeID %d is out of range [1,%d]", nodeId, dtm->nNodes);
1191+
}
1192+
if (!BIT_CHECK(dtm->disabledNodeMask, nodeId-1)) {
1193+
elog(ERROR, "Node %d was not disabled", nodeId);
1194+
}
1195+
if (!IsTransactionBlock())
1196+
{
1197+
MtmBroadcastUtilityStmt(psprintf("select pg_create_logical_replication_slot('" MULTIMASTER_SLOT_PATTERN "', '" MULTIMASTER_NAME "')", nodeId), true);
1198+
}
1199+
}
1200+
1201+
11851202
void MtmDropNode(int nodeId, bool dropSlot)
11861203
{
11871204
if (!BIT_CHECK(dtm->disabledNodeMask, nodeId-1))
@@ -1227,6 +1244,14 @@ mtm_drop_node(PG_FUNCTION_ARGS)
12271244
PG_RETURN_VOID();
12281245
}
12291246

1247+
Datum
1248+
mtm_recover_node(PG_FUNCTION_ARGS)
1249+
{
1250+
int nodeId = PG_GETARG_INT32(0);
1251+
MtmRecoverNode(nodeId);
1252+
PG_RETURN_VOID();
1253+
}
1254+
12301255
Datum
12311256
mtm_get_snapshot(PG_FUNCTION_ARGS)
12321257
{
@@ -1599,7 +1624,7 @@ MtmSerializeLock(PROCLOCK* proclock, void* arg)
15991624
{
16001625
if ((proclock->holdMask & LOCKBIT_ON(lm)) && (conflictMask & LOCKBIT_ON(lm)))
16011626
{
1602-
MTM_TRACE("%d: %u(%u) waits for %u(%u)\n", getpid(), srcPgXact->xid, proc->pid, dstPgXact->xid, proclock->tag.myProc->pid);
1627+
MTM_TRACE("%d: %u(%u) waits for %u(%u)\n", MyProcPid, srcPgXact->xid, proc->pid, dstPgXact->xid, proclock->tag.myProc->pid);
16031628
MtmGetGtid(srcPgXact->xid, &gtid); /* transaction holding lock */
16041629
ByteBufferAppendInt32(buf, gtid.node);
16051630
ByteBufferAppendInt32(buf, gtid.xid);
@@ -1689,6 +1714,7 @@ void MtmRefreshClusterStatus(bool nowait)
16891714

16901715
clique = MtmFindMaxClique(matrix, MtmNodes, &clique_size);
16911716
if (clique_size >= MtmNodes/2+1) { /* have quorum */
1717+
elog(WARNING, "Find clique %lx, disabledNodeMask %lx", clique, dtm->disabledNodeMask);
16921718
MtmLock(LW_EXCLUSIVE);
16931719
mask = ~clique & (((nodemask_t)1 << MtmNodes)-1) & ~dtm->disabledNodeMask; /* new disabled nodes mask */
16941720
for (i = 0; mask != 0; i++, mask >>= 1) {

multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#define MTM_TUPLE_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
1414
*/
1515

16-
#define MULTIMASTER_NAME "mtm"
16+
#define MULTIMASTER_NAME "multimaster"
1717
#define MULTIMASTER_SCHEMA_NAME "mtm"
1818
#define MULTIMASTER_DDL_TABLE "ddl_log"
1919
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
@@ -146,6 +146,7 @@ extern void MtmAdjustSubtransactions(MtmTransState* ts);
146146
extern void MtmLock(LWLockMode mode);
147147
extern void MtmUnlock(void);
148148
extern void MtmDropNode(int nodeId, bool dropSlot);
149+
extern void MtmRecoverNode(int nodeId);
149150
extern void MtmOnNodeDisconnect(int nodeId);
150151
extern void MtmOnNodeConnect(int nodeId);
151152
extern MtmState* MtmGetState(void);

pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -830,7 +830,7 @@ void MtmExecutor(int id, void* work, size_t size)
830830
PG_CATCH();
831831
{
832832
FlushErrorState();
833-
MTM_TRACE("%d: REMOTE abort transaction %d\n", getpid(), GetCurrentTransactionId());
833+
MTM_TRACE("%d: REMOTE abort transaction %d\n", MyProcPid, GetCurrentTransactionId());
834834
AbortCurrentTransaction();
835835
}
836836
PG_END_TRY();

pglogical_receiver.c

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ static int receiver_idle_time = 0;
5656
static bool receiver_sync_mode = false;
5757

5858
/* Worker name */
59-
static char *worker_name = "multimaster";
6059
char worker_proc[BGW_MAXLEN];
6160

6261
/* Lastly written positions */
@@ -252,7 +251,7 @@ pglogical_receiver_main(Datum main_arg)
252251
resetPQExpBuffer(query);
253252
}
254253
if (mode != SLOT_OPEN_EXISTED) {
255-
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", args->receiver_slot, worker_name);
254+
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", args->receiver_slot, MULTIMASTER_NAME);
256255
res = PQexec(conn, query->data);
257256
if (PQresultStatus(res) != PGRES_TUPLES_OK)
258257
{
@@ -568,7 +567,7 @@ int MtmStartReceivers(char* conns, int node_id)
568567
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
569568
worker.bgw_start_time = BgWorkerStart_ConsistentState;
570569
worker.bgw_main = pglogical_receiver_main;
571-
worker.bgw_restart_time = 10; /* Wait 10 seconds for restart before crash */
570+
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
572571

573572
while (conn_str < conn_str_end) {
574573
char* p = strchr(conn_str, ',');

0 commit comments

Comments
 (0)