Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ef58b15

Browse files
knizhnikkelvich
authored andcommitted
Add started hook to pglogical
1 parent 9e43e7b commit ef58b15

File tree

7 files changed

+65
-8
lines changed

7 files changed

+65
-8
lines changed

multimaster.c

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ static int MtmMaxRecoveryLag;
245245
static int MtmGcPeriod;
246246
static bool MtmIgnoreTablesWithoutPk;
247247
static int MtmLockCount;
248+
static int MtmSenderStarted;
248249

249250
static ExecutorStart_hook_type PreviousExecutorStartHook;
250251
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -1667,8 +1668,8 @@ void MtmRecoveryCompleted(void)
16671668
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16681669
}
16691670
/* Mode will be changed to online once all logical receiver are connected */
1670-
elog(LOG, "Recovery completed with %d active receivers from %d", Mtm->nReceivers, Mtm->nLiveNodes-1);
1671-
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
1671+
elog(LOG, "Recovery completed with %d active receivers and %d started senders from %d", Mtm->nReceivers, Mtm->nSenders, Mtm->nLiveNodes-1);
1672+
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
16721673
MtmUnlock();
16731674
}
16741675

@@ -2198,6 +2199,7 @@ static void MtmInitialize()
21982199
Mtm->transListHead = NULL;
21992200
Mtm->transListTail = &Mtm->transListHead;
22002201
Mtm->nReceivers = 0;
2202+
Mtm->nSenders = 0;
22012203
Mtm->timeShift = 0;
22022204
Mtm->transCount = 0;
22032205
Mtm->gcCount = 0;
@@ -2906,11 +2908,9 @@ void MtmReceiverStarted(int nodeId)
29062908
MtmEnableNode(nodeId);
29072909
MtmCheckQuorum();
29082910
}
2909-
elog(LOG, "Start %d receivers from %d cluster status %s", Mtm->nReceivers+1, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2910-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
2911-
if (Mtm->status == MTM_CONNECTED) {
2912-
MtmSwitchClusterMode(MTM_ONLINE);
2913-
}
2911+
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2912+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
2913+
MtmSwitchClusterMode(MTM_ONLINE);
29142914
}
29152915
}
29162916
MtmUnlock();
@@ -2997,6 +2997,7 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
29982998
Mtm->recoverySlot = nodeId;
29992999
Mtm->nReceivers = 0;
3000+
Mtm->nSenders = 0;
30003001
Mtm->recoveryCount += 1;
30013002
Mtm->pglogicalNodeMask = 0;
30023003
MtmUnlock();
@@ -3076,6 +3077,18 @@ MtmOnProcExit(int code, Datum arg)
30763077
}
30773078
}
30783079

3080+
static void
3081+
MtmReplicationStartedHook(struct PGLogicalStartedHookArgs* args)
3082+
{
3083+
MtmLock(LW_EXCLUSIVE);
3084+
MtmSenderStarted = 1;
3085+
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3086+
MtmSwitchClusterMode(MTM_ONLINE);
3087+
}
3088+
MtmUnlock();
3089+
}
3090+
3091+
30793092
static void
30803093
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30813094
{
@@ -3192,6 +3205,9 @@ static void
31923205
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
31933206
{
31943207
if (MtmReplicationNodeId >= 0) {
3208+
MtmLock(LW_EXCLUSIVE);
3209+
Mtm->nSenders -= MtmSenderStarted;
3210+
MtmUnlock();
31953211
MTM_LOG1("Logical replication to node %d is stopped", MtmReplicationNodeId);
31963212
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
31973213
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
@@ -3303,6 +3319,7 @@ bool MtmFilterTransaction(char* record, int size)
33033319
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
33043320
{
33053321
hooks->startup_hook = MtmReplicationStartupHook;
3322+
hooks->started_hook = MtmReplicationStartedHook;
33063323
hooks->shutdown_hook = MtmReplicationShutdownHook;
33073324
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
33083325
hooks->row_filter_hook = MtmReplicationRowFilterHook;

multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,8 @@ typedef struct
273273
int inject2PCError; /* Simulate error during 2PC commit at this node */
274274
int nLiveNodes; /* Number of active nodes */
275275
int nAllNodes; /* Total numbber of nodes */
276-
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
276+
int nReceivers; /* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277+
int nSenders; /* Number of started WAL senders (used to determine moment when recovery) */
277278
int nLockers; /* Number of lockers */
278279
int nActiveTransactions; /* Nunmber of active 2PC transactions */
279280
int nConfigChanges; /* Number of cluster configuration changes */

pglogical_config.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,8 @@ prepare_startup_message(PGLogicalOutputData *data)
477477
*/
478478
l = add_startup_msg_b(l, "hooks.startup_hook_enabled",
479479
data->hooks.startup_hook != NULL);
480+
l = add_startup_msg_b(l, "hooks.started_hook_enabled",
481+
data->hooks.started_hook != NULL);
480482
l = add_startup_msg_b(l, "hooks.shutdown_hook_enabled",
481483
data->hooks.shutdown_hook != NULL);
482484
l = add_startup_msg_b(l, "hooks.row_filter_enabled",

pglogical_hooks.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,14 @@ load_hooks(PGLogicalOutputData *data)
9696

9797
elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
9898
"\tstartup_hook: %p\n"
99+
"\tstarted_hook: %p\n"
99100
"\tshutdown_hook: %p\n"
100101
"\trow_filter_hook: %p\n"
101102
"\ttxn_filter_hook: %p\n"
102103
"\thooks_private_data: %p\n",
103104
hooks_func,
104105
data->hooks.startup_hook,
106+
data->hooks.started_hook,
105107
data->hooks.shutdown_hook,
106108
data->hooks.row_filter_hook,
107109
data->hooks.txn_filter_hook,
@@ -118,6 +120,21 @@ load_hooks(PGLogicalOutputData *data)
118120
CommitTransactionCommand();
119121
}
120122

123+
void
124+
call_started_hook(PGLogicalOutputData *data)
125+
{
126+
struct PGLogicalStartedHookArgs args;
127+
MemoryContext old_ctxt;
128+
129+
if (data->hooks.started_hook != NULL)
130+
{
131+
args.private_data = data->hooks.hooks_private_data;
132+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
133+
(void) (*data->hooks.started_hook)(&args);
134+
MemoryContextSwitchTo(old_ctxt);
135+
}
136+
}
137+
121138
void
122139
call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
123140
{

pglogical_hooks.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ extern void load_hooks(PGLogicalOutputData *data);
1111

1212
extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
1313

14+
extern void call_started_hook(PGLogicalOutputData *data);
15+
1416
extern void call_shutdown_hook(PGLogicalOutputData *data);
1517

1618
extern bool call_row_filter_hook(PGLogicalOutputData *data,

pglogical_output.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
5454
/* These must be available to pg_dlsym() */
5555
static void pg_decode_startup(LogicalDecodingContext * ctx,
5656
OutputPluginOptions *opt, bool is_init);
57+
static void pg_decode_started(LogicalDecodingContext * ctx);
5758
static void pg_decode_shutdown(LogicalDecodingContext * ctx);
5859
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
5960
ReorderBufferTXN *txn);
@@ -83,6 +84,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8384
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
8485

8586
cb->startup_cb = pg_decode_startup;
87+
cb->started_cb = pg_decode_started;
8688
cb->begin_cb = pg_decode_begin_txn;
8789
cb->change_cb = pg_decode_change;
8890
cb->commit_cb = pg_decode_commit_txn;
@@ -490,6 +492,15 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
490492
MemoryContextReset(data->context);
491493
}
492494

495+
static void
496+
pg_decode_started(LogicalDecodingContext * ctx)
497+
{
498+
PGLogicalOutputData *data = ctx->output_plugin_private;
499+
call_started_hook(data);
500+
}
501+
502+
503+
493504
/*
494505
* Decide if the whole transaction with specific origin should be filtered out.
495506
*/

pglogical_output/hooks.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ struct PGLogicalStartupHookArgs
2727

2828
typedef void (*pglogical_startup_hook_fn)(struct PGLogicalStartupHookArgs *args);
2929

30+
struct PGLogicalStartedHookArgs
31+
{
32+
void *private_data;
33+
};
34+
35+
typedef void (*pglogical_started_hook_fn)(struct PGLogicalStartedHookArgs *args);
3036

3137
struct PGLogicalTxnFilterArgs
3238
{
@@ -63,6 +69,7 @@ typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *arg
6369
struct PGLogicalHooks
6470
{
6571
pglogical_startup_hook_fn startup_hook;
72+
pglogical_started_hook_fn started_hook;
6673
pglogical_shutdown_hook_fn shutdown_hook;
6774
pglogical_txn_filter_hook_fn txn_filter_hook;
6875
pglogical_row_filter_hook_fn row_filter_hook;

0 commit comments

Comments
 (0)