Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9cbd0fe

Browse files
knizhnikkelvich
authored andcommitted
Start dynamic pool workers
1 parent 2761da1 commit 9cbd0fe

File tree

6 files changed

+69
-24
lines changed

6 files changed

+69
-24
lines changed

bgwpool.c

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,15 @@
77
#include "storage/spin.h"
88
#include "storage/pg_sema.h"
99
#include "storage/shmem.h"
10+
#include "datatype/timestamp.h"
1011

1112
#include "bgwpool.h"
1213

1314
bool MtmIsLogicalReceiver;
15+
int MtmMaxWorkers;
1416

15-
typedef struct
17+
static void BgwPoolMainLoop(BgwPool* pool)
1618
{
17-
BgwPoolConstructor constructor;
18-
int id;
19-
} BgwPoolExecutorCtx;
20-
21-
static void BgwPoolMainLoop(Datum arg)
22-
{
23-
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)arg;
24-
int id = ctx->id;
25-
BgwPool* pool = ctx->constructor();
2619
int size;
2720
void* work;
2821

@@ -58,7 +51,7 @@ static void BgwPoolMainLoop(Datum arg)
5851
pool->lastPeakTime = 0;
5952
}
6053
SpinLockRelease(&pool->lock);
61-
pool->executor(id, work, size);
54+
pool->executor(work, size);
6255
free(work);
6356
SpinLockAcquire(&pool->lock);
6457
pool->active -= 1;
@@ -84,6 +77,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
8477
pool->pending = 0;
8578
pool->nWorkers = nWorkers;
8679
pool->lastPeakTime = 0;
80+
pool->lastDynamicWorkerStartTime = 0;
8781
strncpy(pool->dbname, dbname, MAX_DBNAME_LEN);
8882
strncpy(pool->dbuser, dbuser, MAX_DBUSER_LEN);
8983
}
@@ -93,6 +87,17 @@ timestamp_t BgwGetLastPeekTime(BgwPool* pool)
9387
return pool->lastPeakTime;
9488
}
9589

90+
static void BgwPoolStaticWorkerMainLoop(Datum arg)
91+
{
92+
BgwPoolConstructor constructor = (BgwPoolConstructor)DatumGetPointer(arg);
93+
BgwPoolMainLoop(constructor());
94+
}
95+
96+
static void BgwPoolDynamicWorkerMainLoop(Datum arg)
97+
{
98+
BgwPoolMainLoop((BgwPool*)DatumGetPointer(arg));
99+
}
100+
96101
void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
97102
{
98103
int i;
@@ -101,15 +106,12 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
101106
MemSet(&worker, 0, sizeof(BackgroundWorker));
102107
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
103108
worker.bgw_start_time = BgWorkerStart_ConsistentState;
104-
worker.bgw_main = BgwPoolMainLoop;
109+
worker.bgw_main = BgwPoolStaticWorkerMainLoop;
105110
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
106111

107112
for (i = 0; i < nWorkers; i++) {
108-
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
109113
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_worker_%d", i+1);
110-
ctx->id = i;
111-
ctx->constructor = constructor;
112-
worker.bgw_main_arg = (Datum)ctx;
114+
worker.bgw_main_arg = PointerGetDatum(constructor);
113115
RegisterBackgroundWorker(&worker);
114116
}
115117
}
@@ -124,14 +126,36 @@ size_t BgwPoolGetQueueSize(BgwPool* pool)
124126
}
125127

126128

129+
static void BgwStartExtraWorker(BgwPool* pool)
130+
{
131+
if (pool->nWorkers < MtmMaxWorkers) {
132+
timestamp_t now = MtmGetSystemTime();
133+
if (pool->lastDynamicWorkerStartTime + MULTIMASTER_BGW_RESTART_TIMEOUT*USECS_PER_SEC < now) {
134+
BackgroundWorker worker;
135+
BackgroundWorkerHandle* handle;
136+
MemSet(&worker, 0, sizeof(BackgroundWorker));
137+
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
138+
worker.bgw_start_time = BgWorkerStart_ConsistentState;
139+
worker.bgw_main = BgwPoolDynamicWorkerMainLoop;
140+
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
141+
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_dynworker_%d", (int)++pool->nWorkers);
142+
worker.bgw_main_arg = PointerGetDatum(pool);
143+
pool->lastDynamicWorkerStartTime = now;
144+
if (!RegisterDynamicBackgroundWorker(&worker, &handle)) {
145+
elog(WARNING, "Failed to start dynamic background worker");
146+
}
147+
}
148+
}
149+
}
150+
127151
void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
128152
{
129153
if (size+4 > pool->size) {
130154
/*
131155
* Size of work is larger than size of shared buffer:
132156
* run it immediately
133157
*/
134-
pool->executor(0, work, size);
158+
pool->executor(work, size);
135159
return;
136160
}
137161

@@ -149,6 +173,9 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
149173
SpinLockAcquire(&pool->lock);
150174
} else {
151175
pool->pending += 1;
176+
if (pool->active == pool->nWorkers) {
177+
BgwStartExtraWorker(pool);
178+
}
152179
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
153180
pool->lastPeakTime = MtmGetSystemTime();
154181
}

bgwpool.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "storage/spin.h"
66
#include "storage/pg_sema.h"
77

8-
typedef void(*BgwPoolExecutor)(int id, void* work, size_t size);
8+
typedef void(*BgwPoolExecutor)(void* work, size_t size);
99

1010
typedef uint64 timestamp_t;
1111

@@ -16,7 +16,8 @@ typedef uint64 timestamp_t;
1616
extern timestamp_t MtmGetSystemTime(void); /* non-adjusted current system time */
1717
extern timestamp_t MtmGetCurrentTime(void); /* adjusted current system time */
1818

19-
extern bool MtmIsLogicalReceiver;
19+
extern bool MtmIsLogicalReceiver;
20+
extern int MtmMaxWorkers;
2021

2122
typedef struct
2223
{
@@ -31,6 +32,7 @@ typedef struct
3132
size_t pending;
3233
size_t nWorkers;
3334
time_t lastPeakTime;
35+
timestamp_t lastDynamicWorkerStartTime;
3436
bool producerBlocked;
3537
char dbname[MAX_DBNAME_LEN];
3638
char dbuser[MAX_DBUSER_LEN];

multimaster.c

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2370,7 +2370,7 @@ _PG_init(void)
23702370

23712371
DefineCustomIntVariable(
23722372
"multimaster.workers",
2373-
"Number of multimaster executor workers per node",
2373+
"Number of multimaster executor workers",
23742374
NULL,
23752375
&MtmWorkers,
23762376
8,
@@ -2383,6 +2383,21 @@ _PG_init(void)
23832383
NULL
23842384
);
23852385

2386+
DefineCustomIntVariable(
2387+
"multimaster.max_workers",
2388+
"Maximal number of multimaster dynamic executor workers",
2389+
NULL,
2390+
&MtmMaxWorkers,
2391+
100,
2392+
0,
2393+
INT_MAX,
2394+
PGC_BACKEND,
2395+
0,
2396+
NULL,
2397+
NULL,
2398+
NULL
2399+
);
2400+
23862401
DefineCustomIntVariable(
23872402
"multimaster.vacuum_delay",
23882403
"Minimal age of records which can be vacuumed (seconds)",
@@ -2919,6 +2934,7 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
29192934
PG_RETURN_INT64(MtmTx.snapshot);
29202935
}
29212936

2937+
29222938
Datum
29232939
mtm_get_last_csn(PG_FUNCTION_ARGS)
29242940
{
@@ -3785,7 +3801,7 @@ void MtmExecute(void* work, int size)
37853801
{
37863802
if (Mtm->status == MTM_RECOVERY) {
37873803
/* During recovery apply changes sequentially to preserve commit order */
3788-
MtmExecutor(0, work, size);
3804+
MtmExecutor(work, size);
37893805
} else {
37903806
BgwPoolExecute(&Mtm->pool, work, size);
37913807
}

multimaster.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot);
301301
extern void MtmReceiverStarted(int nodeId);
302302
extern MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown);
303303
extern void MtmExecute(void* work, int size);
304-
extern void MtmExecutor(int id, void* work, size_t size);
304+
extern void MtmExecutor(void* work, size_t size);
305305
extern void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd);
306306
extern void MtmSendMessage(MtmArbiterMessage* msg);
307307
extern void MtmAdjustSubtransactions(MtmTransState* ts);

pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -953,7 +953,7 @@ process_remote_delete(StringInfo s, Relation rel)
953953

954954
static MemoryContext ApplyContext;
955955

956-
void MtmExecutor(int id, void* work, size_t size)
956+
void MtmExecutor(void* work, size_t size)
957957
{
958958
StringInfoData s;
959959
Relation rel = NULL;

pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ pglogical_receiver_main(Datum main_arg)
530530
} else {
531531
if (MtmPreserveCommitOrder && buf.used == rc - hdr_len) {
532532
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
533-
MtmExecutor(nodeId, buf.data, buf.used);
533+
MtmExecutor(buf.data, buf.used);
534534
} else {
535535
MtmExecute(buf.data, buf.used);
536536
}

0 commit comments

Comments
 (0)