Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 990ccfc

Browse files
knizhnikkelvich
authored andcommitted
More recovery fixes
1 parent 839d890 commit 990ccfc

6 files changed

+50
-36
lines changed

arbiter.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ static void MtmTransReceiver(Datum arg)
10031003
}
10041004
}
10051005
}
1006-
if (Mtm->status != MTM_RECOVERY) {
1006+
if (Mtm->status == MTM_ONLINE) {
10071007
now = MtmGetSystemTime();
10081008
if (now > lastHeartbeatCheck + MSEC_TO_USEC(MtmHeartbeatRecvTimeout)) {
10091009
if (!MtmWatchdog(stopPolling)) {

multimaster.c

+7-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
#include "utils/builtins.h"
4141
#include "utils/memutils.h"
4242
#include "commands/dbcommands.h"
43-
#include "miscadmin.h"
4443
#include "postmaster/autovacuum.h"
4544
#include "storage/pmsignal.h"
4645
#include "storage/proc.h"
@@ -1214,12 +1213,16 @@ static void MtmEnableNode(int nodeId)
12141213

12151214
void MtmRecoveryCompleted(void)
12161215
{
1216+
int i;
12171217
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, reconnect mask=%lx, live nodes=%d",
12181218
MtmNodeId, Mtm->disabledNodeMask, Mtm->reconnectMask, Mtm->nLiveNodes);
12191219
MtmLock(LW_EXCLUSIVE);
12201220
Mtm->recoverySlot = 0;
12211221
Mtm->nodes[MtmNodeId-1].lastStatusChangeTime = MtmGetSystemTime();
12221222
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
1223+
for (i = 0; i < Mtm->nAllNodes; i++) {
1224+
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
1225+
}
12231226
/* Mode will be changed to online once all logical reciever are connected */
12241227
MtmSwitchClusterMode(MTM_CONNECTED);
12251228
MtmUnlock();
@@ -2297,7 +2300,7 @@ void MtmReceiverStarted(int nodeId)
22972300
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
22982301
* Slots at other nodes should be removed
22992302
*/
2300-
MtmSlotMode MtmReceiverSlotMode(int nodeId)
2303+
MtmReplicationMode MtmGetReplicationMode(int nodeId)
23012304
{
23022305
bool recovery = false;
23032306
while (Mtm->status != MTM_CONNECTED && Mtm->status != MTM_ONLINE) {
@@ -2312,7 +2315,7 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
23122315
Mtm->recoveryCount += 1;
23132316
Mtm->pglogicalNodeMask = 0;
23142317
FinishAllPreparedTransactions(false);
2315-
return SLOT_OPEN_EXISTED;
2318+
return REPLMODE_RECOVERY;
23162319
}
23172320
}
23182321
/* delay opening of other slots until recovery is completed */
@@ -2324,7 +2327,7 @@ MtmSlotMode MtmReceiverSlotMode(int nodeId)
23242327
MTM_LOG2("%d: Reuse replication slot for node %d", MyProcPid, nodeId);
23252328
}
23262329
/* After recovery completion we need to drop all other slots to avoid receive of redundant data */
2327-
return recovery ? SLOT_CREATE_NEW : SLOT_OPEN_ALWAYS;
2330+
return recovery ? REPLMODE_RECOVERED : REPLMODE_NORMAL;
23282331
}
23292332

23302333
static bool MtmIsBroadcast()

multimaster.h

+5-5
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,10 @@ typedef enum
117117

118118
typedef enum
119119
{
120-
SLOT_CREATE_NEW, /* create new slot (drop existed) */
121-
SLOT_OPEN_EXISTED, /* open existed slot */
122-
SLOT_OPEN_ALWAYS, /* open existed slot or create new if not exists */
123-
} MtmSlotMode;
120+
REPLMODE_RECOVERED, /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
121+
REPLMODE_RECOVERY, /* perform recorvery of the node by applying all data from the slot from specified point */
122+
REPLMODE_NORMAL /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
123+
} MtmReplicationMode;
124124

125125
typedef struct
126126
{
@@ -244,7 +244,7 @@ extern csn_t MtmAssignCSN(void);
244244
extern csn_t MtmSyncClock(csn_t csn);
245245
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot);
246246
extern void MtmReceiverStarted(int nodeId);
247-
extern MtmSlotMode MtmReceiverSlotMode(int nodeId);
247+
extern MtmReplicationMode MtmGetReplicationMode(int nodeId);
248248
extern void MtmExecute(void* work, int size);
249249
extern void MtmExecutor(int id, void* work, size_t size);
250250
extern void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd);

pglogical_output.c

+3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "utils/relcache.h"
4848
#include "utils/syscache.h"
4949
#include "utils/typcache.h"
50+
#include "miscadmin.h"
5051

5152
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
5253

@@ -155,6 +156,8 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
155156
{
156157
PGLogicalOutputData *data = palloc0(sizeof(PGLogicalOutputData));
157158

159+
elog(LOG, "%d: pg_decode_startup is_init=%d", MyProcPid, is_init);
160+
158161
data->context = AllocSetContextCreate(TopMemoryContext,
159162
"pglogical conversion context",
160163
ALLOCSET_DEFAULT_MINSIZE,

pglogical_proto.c

+1-2
Original file line numberDiff line numberDiff line change
@@ -433,8 +433,7 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
433433
PGLogicalProtoAPI *
434434
pglogical_init_api(PGLogicalProtoType typ)
435435
{
436-
PGLogicalProtoAPI* res = malloc(sizeof(PGLogicalProtoAPI));
437-
MemSet(res, 0, sizeof(PGLogicalProtoAPI));
436+
PGLogicalProtoAPI* res = palloc0(sizeof(PGLogicalProtoAPI));
438437
sscanf(MyReplicationSlot->data.name.data, MULTIMASTER_SLOT_PATTERN, &MtmReplicationNodeId);
439438
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d", MyProcPid, MyReplicationSlot->data.name.data, MtmReplicationNodeId);
440439
res->write_rel = pglogical_write_rel;

pglogical_receiver.c

+33-24
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,9 @@ feTimestampDifference(int64 start_time, int64 stop_time,
193193

194194
static char const* const MtmReplicationModeName[] =
195195
{
196-
"recovered", /* SLOT_CREATE_NEW: recovery of node is completed so drop old slot and restart replication from the current position in WAL */
197-
"recovery", /* SLOT_OPEN_EXISTED: perform recorvery of the node by applying all data from theslot from specified point */
198-
"normal" /* SLOT_OPEN_ALWAYS: normal mode: use existed slot or create new one and start receiving data from it from the specified position */
196+
"recovered", /* recovery of node is completed so drop old slot and restart replication from the current position in WAL */
197+
"recovery", /* perform recorvery of the node by applying all data from theslot from specified point */
198+
"normal" /* normal mode: use existed slot or create new one and start receiving data from it from the specified position */
199199
};
200200

201201
static void
@@ -206,7 +206,7 @@ pglogical_receiver_main(Datum main_arg)
206206
PQExpBuffer query;
207207
PGconn *conn;
208208
PGresult *res;
209-
MtmSlotMode mode;
209+
MtmReplicationMode mode;
210210

211211
ByteBuffer buf;
212212
XLogRecPtr originStartPos = 0;
@@ -251,7 +251,7 @@ pglogical_receiver_main(Datum main_arg)
251251
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
252252
* Slots at other nodes should be removed
253253
*/
254-
mode = MtmReceiverSlotMode(nodeId);
254+
mode = MtmGetReplicationMode(nodeId);
255255
count = Mtm->recoveryCount;
256256

257257
/* Establish connection to remote server */
@@ -264,14 +264,19 @@ pglogical_receiver_main(Datum main_arg)
264264
}
265265

266266
query = createPQExpBuffer();
267-
268-
if (mode == SLOT_CREATE_NEW) {
267+
#if 1 /* Do we need to recretate slot ? */
268+
if (mode == REPLMODE_RECOVERED) { /* recreate slot */
269269
appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"", slotName);
270270
res = PQexec(conn, query->data);
271271
PQclear(res);
272272
resetPQExpBuffer(query);
273273
}
274-
if (mode != SLOT_OPEN_EXISTED) {
274+
#endif
275+
/* My original assumption was that we can perfrom recovery only fromm existed slot,
276+
* but unfortunately looks like slots can "disapear" together with WAL-sender.
277+
* So let's try to recreate slot always. */
278+
/* if (mode != REPLMODE_REPLICATION) */
279+
{
275280
appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", slotName, MULTIMASTER_NAME);
276281
res = PQexec(conn, query->data);
277282
if (PQresultStatus(res) != PGRES_TUPLES_OK)
@@ -291,24 +296,28 @@ pglogical_receiver_main(Datum main_arg)
291296
}
292297

293298
/* Start logical replication at specified position */
294-
StartTransactionCommand();
295-
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
296-
originId = replorigin_by_name(originName, true);
297-
if (originId == InvalidRepOriginId) {
298-
originId = replorigin_create(originName);
299-
/*
300-
* We are just creating new replication slot.
301-
* It is assumed that state of local and remote nodes is the same at this moment.
302-
* Them are either empty, either new node is synchronized using base_backup.
303-
* So we assume that LSNs are the same for local and remote node
304-
*/
305-
originStartPos = Mtm->status == MTM_RECOVERY ? GetXLogInsertRecPtr() : 0;
306-
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
299+
if (mode == REPLMODE_RECOVERED) {
300+
originStartPos = 0;
307301
} else {
308-
originStartPos = replorigin_get_progress(originId, false);
309-
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
302+
StartTransactionCommand();
303+
originName = psprintf(MULTIMASTER_SLOT_PATTERN, nodeId);
304+
originId = replorigin_by_name(originName, true);
305+
if (originId == InvalidRepOriginId) {
306+
originId = replorigin_create(originName);
307+
/*
308+
* We are just creating new replication slot.
309+
* It is assumed that state of local and remote nodes is the same at this moment.
310+
* Them are either empty, either new node is synchronized using base_backup.
311+
* So we assume that LSNs are the same for local and remote node
312+
*/
313+
originStartPos = Mtm->status == MTM_RECOVERY ? GetXLogInsertRecPtr() : 0;
314+
MTM_LOG1("Start logical receiver at position %lx from node %d", originStartPos, nodeId);
315+
} else {
316+
originStartPos = replorigin_get_progress(originId, false);
317+
MTM_LOG1("Restart logical receiver at position %lx with origin=%d from node %d", originStartPos, originId, nodeId);
318+
}
319+
CommitTransactionCommand();
310320
}
311-
CommitTransactionCommand();
312321

313322
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s')",
314323
slotName,

0 commit comments

Comments
 (0)