Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit c83f738

Browse files
knizhnikkelvich
authored andcommitted
Switch from recovery to normal mode
1 parent 31594e5 commit c83f738

File tree

5 files changed

+86
-15
lines changed

5 files changed

+86
-15
lines changed

multimaster.c

Lines changed: 59 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,19 @@ typedef struct {
6060
bool isReplicated; /* transaction on replica */
6161
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6262
bool containsDML; /* transaction contains DML statements */
63+
bool isPrepared; /* transaction was prepared for commit */
6364
csn_t snapshot; /* transaction snaphsot */
6465
} MtmCurrentTrans;
6566

6667
/* #define USE_SPINLOCK 1 */
6768

69+
typedef enum
70+
{
71+
HASH_LOCK_ID,
72+
COMMIT_LOCK_ID,
73+
N_LOCKS
74+
} MtmLockIds;
75+
6876
#define MTM_SHMEM_SIZE (64*1024*1024)
6977
#define MTM_HASH_SIZE 100003
7078
#define USEC 1000000
@@ -150,7 +158,7 @@ void MtmLock(LWLockMode mode)
150158
#ifdef USE_SPINLOCK
151159
SpinLockAcquire(&dtm->spinlock);
152160
#else
153-
LWLockAcquire(dtm->hashLock, mode);
161+
LWLockAcquire(dtm->locks[HASH_LOCK_ID], mode);
154162
#endif
155163
}
156164

@@ -159,7 +167,7 @@ void MtmUnlock(void)
159167
#ifdef USE_SPINLOCK
160168
SpinLockRelease(&dtm->spinlock);
161169
#else
162-
LWLockRelease(dtm->hashLock);
170+
LWLockRelease(dtm->locks[HASH_LOCK_ID]);
163171
#endif
164172
}
165173

@@ -412,7 +420,7 @@ static void MtmInitialize()
412420
{
413421
dtm->status = MTM_INITIALIZATION;
414422
dtm->recoverySlot = 0;
415-
dtm->hashLock = (LWLock*)GetNamedLWLockTranche(MULTIMASTER_NAME);
423+
dtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);
416424
dtm->csn = MtmGetCurrentTime();
417425
dtm->oldestXid = FirstNormalTransactionId;
418426
dtm->nNodes = MtmNodes;
@@ -423,6 +431,7 @@ static void MtmInitialize()
423431
dtm->transListTail = &dtm->transListHead;
424432
dtm->nReceivers = 0;
425433
dtm->timeShift = 0;
434+
pg_atomic_write_u32(&dtm->nCommittingTrans, 0);
426435
PGSemaphoreCreate(&dtm->votingSemaphore);
427436
PGSemaphoreReset(&dtm->votingSemaphore);
428437
SpinLockInit(&dtm->spinlock);
@@ -467,6 +476,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
467476
x->xid = GetCurrentTransactionIdIfAny();
468477
x->isReplicated = false;
469478
x->isDistributed = IsNormalProcessingMode() && dtm->status == MTM_ONLINE && MtmDoReplication && !am_walsender && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess();
479+
x->isPrepared = false;
470480
x->containsDML = false;
471481
x->snapshot = MtmAssignCSN();
472482
x->gtid.xid = InvalidTransactionId;
@@ -476,6 +486,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
476486
}
477487
}
478488

489+
479490
/*
480491
* We need to pass snapshot to WAL-sender, so create record in transaction status hash table
481492
* before commit
@@ -488,8 +499,14 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
488499
if (!x->isDistributed) {
489500
return;
490501
}
491-
x->xid = GetCurrentTransactionId();
502+
/* Check that commits are not disabled */
503+
LWLockAcquire(dtm->locks[COMMIT_LOCK_ID], LW_SHARED);
504+
LWLockRelease(dtm->locks[COMMIT_LOCK_ID]);
492505

506+
pg_atomic_fetch_add_u32(dtm->nCommittingTransactions, 1);
507+
x->isPrepared = true;
508+
x->xid = GetCurrentTransactionId();
509+
493510
MtmLock(LW_EXCLUSIVE);
494511
ts = hash_search(xid2state, &x->xid, HASH_ENTER, NULL);
495512
ts->status = TRANSACTION_STATUS_IN_PROGRESS;
@@ -500,6 +517,7 @@ static void MtmPrepareTransaction(MtmCurrentTrans* x)
500517
ts->procno = MyProc->pgprocno;
501518
ts->nVotes = 0;
502519
ts->done = false;
520+
503521
if (TransactionIdIsValid(x->gtid.xid)) {
504522
ts->gtid = x->gtid;
505523
} else {
@@ -528,6 +546,9 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
528546
MtmAdjustSubtransactions(ts);
529547
MtmUnlock();
530548
}
549+
if (x->isPrepared) {
550+
pg_atomic_fetch_add_u32(dtm->nCommittingTransactions, -1);
551+
}
531552
x->snapshot = INVALID_CSN;
532553
x->xid = InvalidTransactionId;
533554
x->gtid.xid = InvalidTransactionId;
@@ -547,6 +568,39 @@ void MtmSendNotificationMessage(MtmTransState* ts)
547568
}
548569
}
549570

571+
void MtmUpdateStatus(bool recovered)
572+
{
573+
if (dtm->status == MTM_RECOVERY) {
574+
MtmLock(LW_EXCLUSIVE);
575+
dtm->status = MTM_ONLINE; /* Is it all we shoudl do t switch to nortmal state */
576+
MtmUnlock();
577+
}
578+
}
579+
580+
void MtmRecoveryCompleted(int nodeId)
581+
{
582+
if (BIT_CHECK(dtm->pglogicalNodeMask, nodeId-1)) {
583+
if (MyWalSnd->sentPtr == GetXLogInsertRecPtr()) {
584+
/* Ok, now we done with recovery of node */
585+
MtmLock(LW_EXCLUSIVE);
586+
dtm->pglogicalNodeMask &= (int64)1 << (nodeId-1); /* now node is assumed as recovered */
587+
dtm->nNodes += 1;
588+
MtmUnlock();
589+
590+
LWLockRelease(dtm->locks[COMMIT_LOCK_ID]); /* enable commits */
591+
592+
return true;
593+
} else if (MyWalSnd->sentPtr + MtmSlotDelayThreashold > GetXLogInsertRecPtr()) {
594+
/* we almost done with recovery of node.. */
595+
LWLockAcquire(dtm->locks[COMMIT_LOCK_ID], LW_EXCLUSIVE); /* disable new commits */
596+
}
597+
return false;
598+
} else {
599+
return true;
600+
}
601+
}
602+
603+
550604
static bool
551605
MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
552606
{
@@ -803,7 +857,7 @@ _PG_init(void)
803857
* resources in mtm_shmem_startup().
804858
*/
805859
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmQueueSize);
806-
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1);
860+
RequestNamedLWLockTranche(MULTIMASTER_NAME, N_LOCKS);
807861

808862
MtmNodes = MtmStartReceivers(MtmConnStrs, MtmNodeId);
809863
if (MtmNodes < 2) {

multimaster.h

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@
1414

1515
#define BIT_CHECK(mask, bit) ((mask) & ((int64)1 << (bit)))
1616

17-
#define MULTIMASTER_NAME "mtm"
18-
#define MULTIMASTER_SCHEMA_NAME "mtm"
19-
#define MULTIMASTER_DDL_TABLE "ddl_log"
20-
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
17+
#define MULTIMASTER_NAME "mtm"
18+
#define MULTIMASTER_SCHEMA_NAME "mtm"
19+
#define MULTIMASTER_DDL_TABLE "ddl_log"
20+
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
21+
#define MULTIMASTER_MIN_PROTO_VERSION 1
22+
#define MULTIMASTER_MAX_PROTO_VERSION 1
2123

2224
#define Natts_mtm_ddl_log 2
2325
#define Anum_mtm_ddl_log_issued 1
@@ -91,14 +93,15 @@ typedef struct
9193
int recoverySlot; /* NodeId of recovery slot or 0 if none */
9294
volatile slock_t spinlock; /* spinlock used to protect access to hash table */
9395
PGSemaphoreData votingSemaphore; /* semaphore used to notify mtm-sender about new responses to coordinator */
94-
LWLockId hashLock; /* lock to synchronize access to hash table */
96+
LWLockPadded *locks; /* multimaster lock tranche */
9597
TransactionId oldestXid; /* XID of oldest transaction visible by any active transaction (local or global) */
9698
int64 disabledNodeMask; /* bitmask of disabled nodes (so no more than 64 nodes in multimaster:) */
9799
int64 pglogicalNodeMask; /* bitmask of started pglogic receviers */
98100
int nNodes; /* number of active nodes */
99101
int nReceivers; /* number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
100102
long timeShift; /* local time correction */
101103
csn_t csn; /* last obtained CSN: used to provide unique acending CSNs based on system time */
104+
pg_atomic_uint32 nCommittingTrans; /* nubmer of transactions i process of commit */
102105
MtmTransState* votingTransactions; /* L1-list of replicated transactions sendings notifications to coordinator.
103106
This list is used to pass information to mtm-sender BGW */
104107
MtmTransState* transListHead; /* L1 list of all finished transactions present in xid2state hash.
@@ -137,6 +140,7 @@ extern void MtmUnlock(void);
137140
extern void MtmDropNode(int nodeId, bool dropSlot);
138141
extern MtmState* MtmGetState(void);
139142
extern timestamp_t MtmGetCurrentTime(void);
140-
extern void MtmSleep(timestamp_t interval);
141-
143+
extern void MtmSleep(timestamp_t interval);
144+
extern void MtmRecoveryCompleted(int nodeId);
145+
extern void MtmUpdateStatus(bool recovered);
142146
#endif

pglogical_apply.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,9 @@ read_rel(StringInfo s, LOCKMODE mode)
466466
static void
467467
process_remote_commit(StringInfo s)
468468
{
469+
bool recovered = pq_getmsgbyte(s);
469470
CommitTransactionCommand();
471+
MtmUpdateStatus(recovered);
470472
}
471473

472474
static void

pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
129129
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
130130
if (!mm->isLocal) {
131131
pq_sendbyte(out, 'C'); /* sending COMMIT */
132+
pq_sendbyte(out, MtmRecoveryCompleted(mm->nodeId));
132133
}
133134
}
134135

pglogical_receiver.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "utils/guc.h"
3232
#include "utils/snapmgr.h"
3333
#include "executor/spi.h"
34+
#include "replication/origin.h"
3435

3536
#include "multimaster.h"
3637

@@ -203,10 +204,12 @@ pglogical_receiver_main(Datum main_arg)
203204
PGconn *conn;
204205
PGresult *res;
205206
MtmSlotMode mode;
207+
MtmState* ds;
206208
#ifndef USE_PGLOGICAL_OUTPUT
207209
bool insideTrans = false;
208210
#endif
209211
ByteBuffer buf;
212+
XLogRecPtr originStartPos;
210213

211214
/* Register functions for SIGTERM/SIGHUP management */
212215
pqsignal(SIGHUP, receiver_raw_sighup);
@@ -258,8 +261,14 @@ pglogical_receiver_main(Datum main_arg)
258261
resetPQExpBuffer(query);
259262
}
260263
/* Start logical replication at specified position */
261-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL 0/0 (\"startup_params_format\" '1', \"max_proto_version\" '1', \"min_proto_version\" '1')",
262-
args->receiver_slot);
264+
originStartPos = replorigin_session_get_progress(false);
265+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %u/%u (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d')",
266+
args->receiver_slot,
267+
(uint32) (originStartPos >> 32),
268+
(uint32) originStartPos,
269+
MULTIMASTER_MAX_PROTO_VERSION,
270+
MULTIMASTER_MIN_PROTO_VERSION
271+
);
263272
res = PQexec(conn, query->data);
264273
if (PQresultStatus(res) != PGRES_COPY_BOTH)
265274
{
@@ -273,6 +282,7 @@ pglogical_receiver_main(Datum main_arg)
273282

274283
MtmReceiverStarted(args->receiver_node);
275284
ByteBufferAlloc(&buf);
285+
ds = MtmGetState();
276286

277287
while (!got_sigterm)
278288
{
@@ -366,7 +376,7 @@ pglogical_receiver_main(Datum main_arg)
366376
* If sync mode is sent reply in all cases to ensure that
367377
* server knows how far replay has been done.
368378
*/
369-
if (replyRequested || receiver_sync_mode)
379+
if (replyRequested || receiver_sync_mode || ds->status == MTM_RECOVERY)
370380
{
371381
int64 now = feGetCurrentTimestamp();
372382

0 commit comments

Comments
 (0)