Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ae341ef

Browse files
committed
Add started hook to pglogical
1 parent 57da7fc commit ae341ef

File tree

10 files changed

+108
-8
lines changed

10 files changed

+108
-8
lines changed

contrib/mmts/multimaster.c

+24-7
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ static int MtmMaxRecoveryLag;
245245
static int MtmGcPeriod;
246246
static bool MtmIgnoreTablesWithoutPk;
247247
static int MtmLockCount;
248+
static int MtmSenderStarted;
248249

249250
static ExecutorStart_hook_type PreviousExecutorStartHook;
250251
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -1667,8 +1668,8 @@ void MtmRecoveryCompleted(void)
16671668
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16681669
}
16691670
/* Mode will be changed to online once all logical receiver are connected */
1670-
elog(LOG, "Recovery completed with %d active receivers from %d", Mtm->nReceivers, Mtm->nLiveNodes-1);
1671-
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
1671+
elog(LOG, "Recovery completed with %d active receivers and %d started senders from %d", Mtm->nReceivers, Mtm->nSenders, Mtm->nLiveNodes-1);
1672+
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
16721673
MtmUnlock();
16731674
}
16741675

@@ -2198,6 +2199,7 @@ static void MtmInitialize()
21982199
Mtm->transListHead = NULL;
21992200
Mtm->transListTail = &Mtm->transListHead;
22002201
Mtm->nReceivers = 0;
2202+
Mtm->nSenders = 0;
22012203
Mtm->timeShift = 0;
22022204
Mtm->transCount = 0;
22032205
Mtm->gcCount = 0;
@@ -2906,11 +2908,9 @@ void MtmReceiverStarted(int nodeId)
29062908
MtmEnableNode(nodeId);
29072909
MtmCheckQuorum();
29082910
}
2909-
elog(LOG, "Start %d receivers from %d cluster status %s", Mtm->nReceivers+1, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2910-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
2911-
if (Mtm->status == MTM_CONNECTED) {
2912-
MtmSwitchClusterMode(MTM_ONLINE);
2913-
}
2911+
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2912+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
2913+
MtmSwitchClusterMode(MTM_ONLINE);
29142914
}
29152915
}
29162916
MtmUnlock();
@@ -2997,6 +2997,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
29982998
Mtm->recoverySlot = nodeId;
29992999
Mtm->nReceivers = 0;
3000+
Mtm->nSenders = 0;
30003001
Mtm->recoveryCount += 1;
30013002
Mtm->pglogicalNodeMask = 0;
30023003
MtmUnlock();
@@ -3076,6 +3077,18 @@ MtmOnProcExit(int code, Datum arg)
30763077
}
30773078
}
30783079

3080+
static void
3081+
MtmReplicationStartedHook(struct PGLogicalStartedHookArgs* args)
3082+
{
3083+
MtmLock(LW_EXCLUSIVE);
3084+
MtmSenderStarted = 1;
3085+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3086+
MtmSwitchClusterMode(MTM_ONLINE);
3087+
}
3088+
MtmUnlock();
3089+
}
3090+
3091+
30793092
static void
30803093
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30813094
{
@@ -3192,6 +3205,9 @@ static void
31923205
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
31933206
{
31943207
if (MtmReplicationNodeId >= 0) {
3208+
MtmLock(LW_EXCLUSIVE);
3209+
Mtm->nSenders -= MtmSenderStarted;
3210+
MtmUnlock();
31953211
MTM_LOG1("Logical replication to node %d is stopped", MtmReplicationNodeId);
31963212
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
31973213
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
@@ -3303,6 +3319,7 @@ bool MtmFilterTransaction(char* record, int size)
33033319
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
33043320
{
33053321
hooks->startup_hook = MtmReplicationStartupHook;
3322+
hooks->started_hook = MtmReplicationStartedHook;
33063323
hooks->shutdown_hook = MtmReplicationShutdownHook;
33073324
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
33083325
hooks->row_filter_hook = MtmReplicationRowFilterHook;

contrib/mmts/multimaster.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ typedef struct
273273
int inject2PCError; /* Simulate error during 2PC commit at this node */
274274
int nLiveNodes; /* Number of active nodes */
275275
int nAllNodes; /* Total numbber of nodes */
276-
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
276+
int nReceivers; /* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277+
int nSenders; /* Number of started WAL senders (used to determine moment when recovery) */
277278
int nLockers; /* Number of lockers */
278279
int nActiveTransactions; /* Nunmber of active 2PC transactions */
279280
int nConfigChanges; /* Number of cluster configuration changes */

contrib/mmts/pglogical_config.c

+2
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,8 @@ prepare_startup_message(PGLogicalOutputData *data)
477477
*/
478478
l = add_startup_msg_b(l, "hooks.startup_hook_enabled",
479479
data->hooks.startup_hook != NULL);
480+
l = add_startup_msg_b(l, "hooks.started_hook_enabled",
481+
data->hooks.started_hook != NULL);
480482
l = add_startup_msg_b(l, "hooks.shutdown_hook_enabled",
481483
data->hooks.shutdown_hook != NULL);
482484
l = add_startup_msg_b(l, "hooks.row_filter_enabled",

contrib/mmts/pglogical_hooks.c

+17
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,14 @@ load_hooks(PGLogicalOutputData *data)
9696

9797
elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
9898
"\tstartup_hook: %p\n"
99+
"\tstarted_hook: %p\n"
99100
"\tshutdown_hook: %p\n"
100101
"\trow_filter_hook: %p\n"
101102
"\ttxn_filter_hook: %p\n"
102103
"\thooks_private_data: %p\n",
103104
hooks_func,
104105
data->hooks.startup_hook,
106+
data->hooks.started_hook,
105107
data->hooks.shutdown_hook,
106108
data->hooks.row_filter_hook,
107109
data->hooks.txn_filter_hook,
@@ -118,6 +120,21 @@ load_hooks(PGLogicalOutputData *data)
118120
CommitTransactionCommand();
119121
}
120122

123+
void
124+
call_started_hook(PGLogicalOutputData *data)
125+
{
126+
struct PGLogicalStartedHookArgs args;
127+
MemoryContext old_ctxt;
128+
129+
if (data->hooks.started_hook != NULL)
130+
{
131+
args.private_data = data->hooks.hooks_private_data;
132+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
133+
(void) (*data->hooks.started_hook)(&args);
134+
MemoryContextSwitchTo(old_ctxt);
135+
}
136+
}
137+
121138
void
122139
call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
123140
{

contrib/mmts/pglogical_hooks.h

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ extern void load_hooks(PGLogicalOutputData *data);
1111

1212
extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
1313

14+
extern void call_started_hook(PGLogicalOutputData *data);
15+
1416
extern void call_shutdown_hook(PGLogicalOutputData *data);
1517

1618
extern bool call_row_filter_hook(PGLogicalOutputData *data,

contrib/mmts/pglogical_output.c

+11
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
5454
/* These must be available to pg_dlsym() */
5555
static void pg_decode_startup(LogicalDecodingContext * ctx,
5656
OutputPluginOptions *opt, bool is_init);
57+
static void pg_decode_started(LogicalDecodingContext * ctx);
5758
static void pg_decode_shutdown(LogicalDecodingContext * ctx);
5859
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
5960
ReorderBufferTXN *txn);
@@ -83,6 +84,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8384
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
8485

8586
cb->startup_cb = pg_decode_startup;
87+
cb->started_cb = pg_decode_started;
8688
cb->begin_cb = pg_decode_begin_txn;
8789
cb->change_cb = pg_decode_change;
8890
cb->commit_cb = pg_decode_commit_txn;
@@ -490,6 +492,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
490492
MemoryContextReset(data->context);
491493
}
492494

495+
static void
496+
pg_decode_started(LogicalDecodingContext * ctx)
497+
{
498+
PGLogicalOutputData *data = ctx->output_plugin_private;
499+
call_started_hook(data);
500+
}
501+
502+
503+
493504
/*
494505
* Decide if the whole transaction with specific origin should be filtered out.
495506
*/

contrib/mmts/pglogical_output/hooks.h

+7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ struct PGLogicalStartupHookArgs
2727

2828
typedef void (*pglogical_startup_hook_fn)(struct PGLogicalStartupHookArgs *args);
2929

30+
struct PGLogicalStartedHookArgs
31+
{
32+
void *private_data;
33+
};
34+
35+
typedef void (*pglogical_started_hook_fn)(struct PGLogicalStartedHookArgs *args);
3036

3137
struct PGLogicalTxnFilterArgs
3238
{
@@ -63,6 +69,7 @@ typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *arg
6369
struct PGLogicalHooks
6470
{
6571
pglogical_startup_hook_fn startup_hook;
72+
pglogical_started_hook_fn started_hook;
6673
pglogical_shutdown_hook_fn shutdown_hook;
6774
pglogical_txn_filter_hook_fn txn_filter_hook;
6875
pglogical_row_filter_hook_fn row_filter_hook;

contrib/multimaster/pglogical_output.c

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7676
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
7777

7878
cb->startup_cb = pg_decode_startup;
79+
cb->started_cb = pg_decode_started;
7980
cb->begin_cb = pg_decode_begin_txn;
8081
cb->change_cb = pg_decode_change;
8182
cb->commit_cb = pg_decode_commit_txn;

src/backend/replication/logical/logical.c

+32
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef struct LogicalErrorCallbackState
5454

5555
/* wrappers around output plugin callbacks */
5656
static void output_plugin_error_callback(void *arg);
57+
static void started_cb_wrapper(LogicalDecodingContext *ctx);
5758
static void startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
5859
bool is_init);
5960
static void shutdown_cb_wrapper(LogicalDecodingContext *ctx);
@@ -413,6 +414,7 @@ DecodingContextReady(LogicalDecodingContext *ctx)
413414
void
414415
DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
415416
{
417+
MemoryContext old_context;
416418
XLogRecPtr startptr;
417419

418420
/* Initialize from where to start reading WAL. */
@@ -447,6 +449,11 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
447449
}
448450

449451
ctx->slot->data.confirmed_flush = ctx->reader->EndRecPtr;
452+
453+
old_context = MemoryContextSwitchTo(ctx->context);
454+
if (ctx->callbacks.started_cb != NULL)
455+
started_cb_wrapper(ctx);
456+
MemoryContextSwitchTo(old_context);
450457
}
451458

452459
/*
@@ -562,6 +569,31 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
562569
error_context_stack = errcallback.previous;
563570
}
564571

572+
static void
573+
started_cb_wrapper(LogicalDecodingContext *ctx)
574+
{
575+
LogicalErrorCallbackState state;
576+
ErrorContextCallback errcallback;
577+
578+
/* Push callback + info on the error context stack */
579+
state.ctx = ctx;
580+
state.callback_name = "startup";
581+
state.report_location = InvalidXLogRecPtr;
582+
errcallback.callback = output_plugin_error_callback;
583+
errcallback.arg = (void *) &state;
584+
errcallback.previous = error_context_stack;
585+
error_context_stack = &errcallback;
586+
587+
/* set output state */
588+
ctx->accept_writes = false;
589+
590+
/* do the actual work: call callback */
591+
ctx->callbacks.started_cb(ctx);
592+
593+
/* Pop the error context stack */
594+
error_context_stack = errcallback.previous;
595+
}
596+
565597
static void
566598
shutdown_cb_wrapper(LogicalDecodingContext *ctx)
567599
{

src/include/replication/output_plugin.h

+10
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,15 @@ typedef void (*LogicalDecodeStartupCB) (
4747
bool is_init
4848
);
4949

50+
/*
51+
* Callback that gets called when WAL-sender is started. ctx->private_data can
52+
* be set to some private data.
53+
*
54+
*/
55+
typedef void (*LogicalDecodeStartedCB) (
56+
struct LogicalDecodingContext *ctx
57+
);
58+
5059
/*
5160
* Callback called for every (explicit or implicit) BEGIN of a successful
5261
* transaction.
@@ -105,6 +114,7 @@ typedef void (*LogicalDecodeShutdownCB) (
105114
typedef struct OutputPluginCallbacks
106115
{
107116
LogicalDecodeStartupCB startup_cb;
117+
LogicalDecodeStartedCB started_cb;
108118
LogicalDecodeBeginCB begin_cb;
109119
LogicalDecodeChangeCB change_cb;
110120
LogicalDecodeCommitCB commit_cb;

0 commit comments

Comments
 (0)