Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b908eaf

Browse files
committed
Use separate worker pools in each receiver.
relid_map assumes that backend works with only one remote server, which was wrong with shared pool. It was possible to add node info to relid_map, but with shared pool we can simplify several other thing (do not switch replication session, etc).
1 parent ff11e56 commit b908eaf

File tree

6 files changed

+72
-65
lines changed

6 files changed

+72
-65
lines changed

contrib/mmts/bgwpool.c

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ void BgwPoolDynamicWorkerMainLoop(Datum arg);
2626

2727
static void BgwShutdownWorker(int sig)
2828
{
29-
MTM_LOG1("Background worker %d receive shutdown request", MyProcPid);
29+
MTM_LOG1("Background worker %d received shutdown request", MyProcPid);
3030
if (MtmPool) {
3131
BgwPoolStop(MtmPool);
3232
}
@@ -137,16 +137,15 @@ timestamp_t BgwGetLastPeekTime(BgwPool* pool)
137137

138138
void BgwPoolStaticWorkerMainLoop(Datum arg)
139139
{
140-
BgwPoolConstructor constructor = (BgwPoolConstructor)DatumGetPointer(arg);
141-
BgwPoolMainLoop(constructor());
140+
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
142141
}
143142

144143
void BgwPoolDynamicWorkerMainLoop(Datum arg)
145144
{
146-
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
145+
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
147146
}
148147

149-
void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
148+
void BgwPoolStart(BgwPool* pool, char *poolName)
150149
{
151150
int i;
152151
BackgroundWorker worker;
@@ -158,9 +157,12 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
158157
sprintf(worker.bgw_function_name, "BgwPoolStaticWorkerMainLoop");
159158
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
160159

161-
for (i = 0; i < nWorkers; i++) {
162-
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_worker_%d", i+1);
163-
worker.bgw_main_arg = PointerGetDatum(constructor);
160+
strncpy(pool->poolName, poolName, MAX_NAME_LEN);
161+
162+
for (i = 0; i < pool->nWorkers; i++)
163+
{
164+
snprintf(worker.bgw_name, BGW_MAXLEN, "%s_worker_%d", pool->poolName, i+1);
165+
worker.bgw_main_arg = PointerGetDatum(pool);
164166
RegisterBackgroundWorker(&worker);
165167
}
166168
}
@@ -189,7 +191,7 @@ static void BgwStartExtraWorker(BgwPool* pool)
189191
sprintf(worker.bgw_library_name, "multimaster");
190192
sprintf(worker.bgw_function_name, "BgwPoolDynamicWorkerMainLoop");
191193
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
192-
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_dynworker_%d", (int)++pool->nWorkers);
194+
snprintf(worker.bgw_name, BGW_MAXLEN, "%s-dynworker-%d", pool->poolName, (int)++pool->nWorkers);
193195
worker.bgw_main_arg = PointerGetDatum(pool);
194196
pool->lastDynamicWorkerStartTime = now;
195197
if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {

contrib/mmts/bgwpool.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ typedef ulong64 timestamp_t;
1313

1414
#define MAX_DBNAME_LEN 30
1515
#define MAX_DBUSER_LEN 30
16+
#define MAX_NAME_LEN 30
1617
#define MULTIMASTER_BGW_RESTART_TIMEOUT BGW_NEVER_RESTART /* seconds */
1718

1819
extern timestamp_t MtmGetSystemTime(void); /* non-adjusted current system time */
@@ -37,14 +38,15 @@ typedef struct
3738
timestamp_t lastDynamicWorkerStartTime;
3839
bool producerBlocked;
3940
bool shutdown;
41+
char poolName[MAX_NAME_LEN];
4042
char dbname[MAX_DBNAME_LEN];
4143
char dbuser[MAX_DBUSER_LEN];
4244
char* queue;
4345
} BgwPool;
4446

4547
typedef BgwPool*(*BgwPoolConstructor)(void);
4648

47-
extern void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor);
49+
extern void BgwPoolStart(BgwPool* pool, char *poolName);
4850

4951
extern void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers);
5052

contrib/mmts/multimaster.c

Lines changed: 36 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,6 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId *subxids, int
176176

177177
static void MtmShmemStartup(void);
178178

179-
static BgwPool* MtmPoolConstructor(void);
180179
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
181180
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError, int forceOnNode);
182181
static void MtmProcessDDLCommand(char const* queryString, bool transactional);
@@ -266,12 +265,12 @@ bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some
266265
bool MtmMajorNode;
267266
char* MtmRefereeConnStr;
268267
bool MtmEnforceLocalTx;
268+
int MtmWorkers;
269269

270270
static char* MtmConnStrs;
271271
static char* MtmRemoteFunctionsList;
272272
static char* MtmClusterName;
273273
static int MtmQueueSize;
274-
static int MtmWorkers;
275274
static int MtmVacuumDelay;
276275
static int MtmMinRecoveryLag;
277276
static int MtmMaxRecoveryLag;
@@ -2558,7 +2557,8 @@ static void MtmInitialize()
25582557
Mtm->inject2PCError = 0;
25592558
Mtm->sendQueue = NULL;
25602559
Mtm->freeQueue = NULL;
2561-
for (i = 0; i < MtmNodes; i++) {
2560+
for (i = 0; i < MtmMaxNodes; i++)
2561+
{
25622562
Mtm->nodes[i].oldestSnapshot = 0;
25632563
Mtm->nodes[i].disabledNodeMask = 0;
25642564
Mtm->nodes[i].connectivityMask = (((nodemask_t)1 << MtmNodes) - 1);
@@ -2576,14 +2576,14 @@ static void MtmInitialize()
25762576
Mtm->nodes[i].nHeartbeats = 0;
25772577
Mtm->nodes[i].manualRecovery = false;
25782578
Mtm->nodes[i].slotDeleted = false;
2579+
BgwPoolInit(&Mtm->nodes[i].pool, MtmExecutor, MtmDatabaseName, MtmDatabaseUser, MtmQueueSize, 0);
25792580
}
25802581
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
25812582
/* All transaction originated from the current node should be ignored during recovery */
25822583
Mtm->nodes[MtmNodeId-1].restartLSN = (lsn_t)PG_UINT64_MAX;
25832584
Mtm->sendSemaphore = PGSemaphoreCreate();
25842585
PGSemaphoreReset(Mtm->sendSemaphore);
25852586
SpinLockInit(&Mtm->queueSpinlock);
2586-
BgwPoolInit(&Mtm->pool, MtmExecutor, MtmDatabaseName, MtmDatabaseUser, MtmQueueSize, MtmWorkers);
25872587
RegisterXactCallback(MtmXactCallback, NULL);
25882588
MtmTx.snapshot = INVALID_CSN;
25892589
MtmTx.xid = InvalidTransactionId;
@@ -2754,7 +2754,7 @@ static void MtmSplitConnStrs(void)
27542754
MTM_ELOG(ERROR, "More than %d nodes are specified", MtmMaxNodes);
27552755
}
27562756
MtmNodes = i;
2757-
MtmConnections = (MtmConnectionInfo*)palloc(MtmMaxNodes*sizeof(MtmConnectionInfo));
2757+
MtmConnections = (MtmConnectionInfo*)palloc0(MtmMaxNodes*sizeof(MtmConnectionInfo));
27582758

27592759
if (f != NULL) {
27602760
fseek(f, SEEK_SET, 0);
@@ -3369,11 +3369,9 @@ _PG_init(void)
33693369
* the postmaster process.) We'll allocate or attach to the shared
33703370
* resources in mtm_shmem_startup().
33713371
*/
3372-
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmQueueSize);
3372+
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize);
33733373
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2);
33743374

3375-
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
3376-
33773375
MtmArbiterInitialize();
33783376

33793377
/*
@@ -4313,18 +4311,30 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
43134311
TupleDesc desc;
43144312
Datum values[Natts_mtm_cluster_state];
43154313
bool nulls[Natts_mtm_cluster_state] = {false};
4314+
int i,
4315+
pool_active = 0,
4316+
pool_pending = 0,
4317+
pool_queue_size = 0;
4318+
43164319
get_call_result_type(fcinfo, NULL, &desc);
43174320

4321+
for (i = 0; i < Mtm->nAllNodes; i++)
4322+
{
4323+
pool_active += (int) Mtm->nodes[i].pool.active;
4324+
pool_pending += (int) Mtm->nodes[i].pool.pending;
4325+
pool_queue_size += (int) BgwPoolGetQueueSize(&Mtm->nodes[i].pool);
4326+
}
4327+
43184328
values[0] = Int32GetDatum(MtmNodeId);
43194329
values[1] = CStringGetTextDatum(MtmNodeStatusMnem[Mtm->status]);
43204330
values[2] = Int64GetDatum(Mtm->disabledNodeMask);
43214331
values[3] = Int64GetDatum(SELF_CONNECTIVITY_MASK);
43224332
values[4] = Int64GetDatum(Mtm->originLockNodeMask);
43234333
values[5] = Int32GetDatum(Mtm->nLiveNodes);
43244334
values[6] = Int32GetDatum(Mtm->nAllNodes);
4325-
values[7] = Int32GetDatum((int)Mtm->pool.active);
4326-
values[8] = Int32GetDatum((int)Mtm->pool.pending);
4327-
values[9] = Int64GetDatum(BgwPoolGetQueueSize(&Mtm->pool));
4335+
values[7] = Int32GetDatum(pool_active);
4336+
values[8] = Int32GetDatum(pool_pending);
4337+
values[9] = Int64GetDatum(pool_queue_size);
43284338
values[10] = Int64GetDatum(Mtm->transCount);
43294339
values[11] = Int64GetDatum(Mtm->timeShift);
43304340
values[12] = Int32GetDatum(Mtm->recoverySlot);
@@ -5608,28 +5618,6 @@ static void MtmSeqNextvalHook(Oid seqid, int64 next)
56085618
}
56095619
}
56105620

5611-
/*
5612-
* -------------------------------------------
5613-
* Executor pool interface
5614-
* -------------------------------------------
5615-
*/
5616-
5617-
void MtmExecute(void* work, int size)
5618-
{
5619-
if (Mtm->status == MTM_RECOVERY) {
5620-
/* During recovery apply changes sequentially to preserve commit order */
5621-
MtmExecutor(work, size);
5622-
} else {
5623-
BgwPoolExecute(&Mtm->pool, work, size);
5624-
}
5625-
}
5626-
5627-
static BgwPool*
5628-
MtmPoolConstructor(void)
5629-
{
5630-
return &Mtm->pool;
5631-
}
5632-
56335621
/*
56345622
* -------------------------------------------
56355623
* Deadlock detection
@@ -5743,21 +5731,21 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
57435731
MtmGetGtid(xid, &gtid);
57445732
hasDeadlock = MtmGraphFindLoop(&graph, &gtid);
57455733
MTM_ELOG(LOG, "Distributed deadlock check by backend %d for %u:%llu = %d", MyProcPid, gtid.node, (long64)gtid.xid, hasDeadlock);
5746-
if (!hasDeadlock) {
5747-
/* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
5748-
* can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
5749-
* refelected in lock graph
5750-
*/
5751-
timestamp_t lastPeekTime = BgwGetLastPeekTime(&Mtm->pool);
5752-
if (lastPeekTime != 0 && MtmGetSystemTime() - lastPeekTime >= MSEC_TO_USEC(DeadlockTimeout)) {
5753-
hasDeadlock = true;
5754-
MTM_ELOG(WARNING, "Apply workers were blocked more than %d msec",
5755-
(int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
5756-
} else {
5757-
MTM_LOG1("Enable deadlock timeout in backend %d for transaction %llu", MyProcPid, (long64)xid);
5758-
enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
5759-
}
5760-
}
5734+
// if (!hasDeadlock) {
5735+
// /* There is no deadlock loop in graph, but deadlock can be caused by lack of apply workers: if all of them are busy, then some transactions
5736+
// * can not be appied just because there are no vacant workers and it cause additional dependency between transactions which is not
5737+
// * refelected in lock graph
5738+
// */
5739+
// timestamp_t lastPeekTime = minBgwGetLastPeekTime(&Mtm->pool);
5740+
// if (lastPeekTime != 0 && MtmGetSystemTime() - lastPeekTime >= MSEC_TO_USEC(DeadlockTimeout)) {
5741+
// hasDeadlock = true;
5742+
// MTM_ELOG(WARNING, "Apply workers were blocked more than %d msec",
5743+
// (int)USEC_TO_MSEC(MtmGetSystemTime() - lastPeekTime));
5744+
// } else {
5745+
// MTM_LOG1("Enable deadlock timeout in backend %d for transaction %llu", MyProcPid, (long64)xid);
5746+
// enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
5747+
// }
5748+
// }
57615749
}
57625750
return hasDeadlock;
57635751
}

contrib/mmts/multimaster.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,8 @@ typedef struct
223223
typedef struct
224224
{
225225
MtmConnectionInfo con;
226+
/* Pool of background workers for applying logical replication */
227+
BgwPool pool;
226228
timestamp_t transDelay;
227229
timestamp_t lastStatusChangeTime;
228230
timestamp_t receiverStartTime;
@@ -338,7 +340,6 @@ typedef struct
338340
MtmMessageQueue* sendQueue; /* Messages to be sent by arbiter sender */
339341
MtmMessageQueue* freeQueue; /* Free messages */
340342
lsn_t recoveredLSN; /* LSN at the moment of recovery completion */
341-
BgwPool pool; /* Pool of background workers for applying logical replication patches */
342343
MtmNodeInfo nodes[1]; /* [Mtm->nAllNodes]: per-node data */
343344
} MtmState;
344345

@@ -394,6 +395,7 @@ extern bool MtmBackgroundWorker;
394395
extern char* MtmRefereeConnStr;
395396
extern bool MtmEnforceLocalTx;
396397
extern bool MtmIsRecoverySession;
398+
extern int MtmWorkers;
397399

398400

399401
extern void MtmArbiterInitialize(void);
@@ -404,7 +406,6 @@ extern csn_t MtmAssignCSN(void);
404406
extern csn_t MtmSyncClock(csn_t csn);
405407
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot, nodemask_t participantsMask);
406408
extern MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown);
407-
extern void MtmExecute(void* work, int size);
408409
extern void MtmExecutor(void* work, size_t size);
409410
extern void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd);
410411
extern void MtmSendMessage(MtmArbiterMessage* msg);

contrib/mmts/pglogical_receiver.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,17 @@ static char const* const MtmReplicationModeName[] =
212212
"open_existed" /* normal mode: use existed slot or create new one and start receiving data from it from the rememered position */
213213
};
214214

215+
static void
216+
MtmExecute(void* work, int size)
217+
{
218+
/* During recovery apply changes sequentially to preserve commit order */
219+
if (Mtm->status == MTM_RECOVERY)
220+
MtmExecutor(work, size);
221+
else
222+
BgwPoolExecute(&Mtm->nodes[MtmReplicationNodeId-1].pool, work, size);
223+
}
224+
225+
215226
void
216227
pglogical_receiver_main(Datum main_arg)
217228
{
@@ -252,7 +263,7 @@ pglogical_receiver_main(Datum main_arg)
252263
Mtm->nodes[nodeId-1].receiverStartTime = MtmGetSystemTime();
253264
MtmReplicationNodeId = nodeId;
254265

255-
sprintf(worker_proc, "mtm_pglogical_receiver_%d_%d", MtmNodeId, nodeId);
266+
snprintf(worker_proc, BGW_MAXLEN, "mtm-logrep-receiver-%d-%d", MtmNodeId, nodeId);
256267

257268
/* We're now ready to receive signals */
258269
BackgroundWorkerUnblockSignals();
@@ -263,6 +274,8 @@ pglogical_receiver_main(Datum main_arg)
263274
ActivePortal->status = PORTAL_ACTIVE;
264275
ActivePortal->sourceText = "";
265276

277+
BgwPoolStart(&Mtm->nodes[nodeId-1].pool, worker_proc);
278+
266279
/*
267280
* Set proper restartLsn for all origins
268281
*/
@@ -427,7 +440,7 @@ pglogical_receiver_main(Datum main_arg)
427440
/* Emergency bailout if postmaster has died */
428441
if (rc & WL_POSTMASTER_DEATH)
429442
{
430-
BgwPoolStop(&Mtm->pool);
443+
BgwPoolStop(&Mtm->nodes[nodeId-1].pool);
431444
proc_exit(1);
432445
}
433446

@@ -729,7 +742,7 @@ void MtmStartReceiver(int nodeId, bool dynamic)
729742
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
730743

731744
/* Worker parameter and registration */
732-
snprintf(worker.bgw_name, BGW_MAXLEN, "mtm_pglogical_receiver_%d_%d", MtmNodeId, nodeId);
745+
snprintf(worker.bgw_name, BGW_MAXLEN, "mtm-logrep-receiver-%d-%d", MtmNodeId, nodeId);
733746

734747
worker.bgw_main_arg = Int32GetDatum(nodeId);
735748
if (dynamic) {

src/include/storage/proc.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,9 +279,10 @@ extern PGPROC *PreparedXactProcs;
279279
#define NUM_AUXILIARY_PROCS 4
280280

281281
/*
282-
* Number of extra semaphores used by Postgres (right now 3 semaphores are used by multimaster)
282+
* Number of extra semaphores used by Postgres (right now multimaster uses
283+
* 1 semaphore for arbiter and 2 semas per each logrep receiver).
283284
*/
284-
#define NUM_EXTRA_SEMAPHORES 4
285+
#define NUM_EXTRA_SEMAPHORES 16
285286

286287
/* configurable options */
287288
extern PGDLLIMPORT int DeadlockTimeout;

0 commit comments

Comments
 (0)