Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1d2cfd1

Browse files
knizhnikkelvich
authored andcommitted
Reverse started hook commits
1 parent dde47c0 commit 1d2cfd1

File tree

7 files changed

+8
-66
lines changed

7 files changed

+8
-66
lines changed

multimaster.c

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,6 @@ static int MtmMaxRecoveryLag;
245245
static int MtmGcPeriod;
246246
static bool MtmIgnoreTablesWithoutPk;
247247
static int MtmLockCount;
248-
static int MtmSenderStarted;
249248

250249
static ExecutorStart_hook_type PreviousExecutorStartHook;
251250
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
@@ -1668,8 +1667,8 @@ void MtmRecoveryCompleted(void)
16681667
Mtm->nodes[i].lastHeartbeat = 0; /* defuse watchdog until first heartbeat is received */
16691668
}
16701669
/* Mode will be changed to online once all logical receiver are connected */
1671-
elog(LOG, "Recovery completed with %d active receivers and %d started senders from %d", Mtm->nReceivers, Mtm->nSenders, Mtm->nLiveNodes-1);
1672-
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
1670+
elog(LOG, "Recovery completed with %d active receivers from %d", Mtm->nReceivers, Mtm->nLiveNodes-1);
1671+
MtmSwitchClusterMode(Mtm->nReceivers == Mtm->nLiveNodes-1 ? MTM_ONLINE : MTM_CONNECTED);
16731672
MtmUnlock();
16741673
}
16751674

@@ -2199,7 +2198,6 @@ static void MtmInitialize()
21992198
Mtm->transListHead = NULL;
22002199
Mtm->transListTail = &Mtm->transListHead;
22012200
Mtm->nReceivers = 0;
2202-
Mtm->nSenders = 0;
22032201
Mtm->timeShift = 0;
22042202
Mtm->transCount = 0;
22052203
Mtm->gcCount = 0;
@@ -2908,9 +2906,11 @@ void MtmReceiverStarted(int nodeId)
29082906
MtmEnableNode(nodeId);
29092907
MtmCheckQuorum();
29102908
}
2911-
elog(LOG, "Start %d receivers and %d senders from %d cluster status %s", Mtm->nReceivers+1, Mtm->nSenders, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2912-
if (++Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
2913-
MtmSwitchClusterMode(MTM_ONLINE);
2909+
elog(LOG, "Start %d receivers from %d cluster status %s", Mtm->nReceivers+1, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
2910+
if (++Mtm->nReceivers == Mtm->nLiveNodes-1) {
2911+
if (Mtm->status == MTM_CONNECTED) {
2912+
MtmSwitchClusterMode(MTM_ONLINE);
2913+
}
29142914
}
29152915
}
29162916
MtmUnlock();
@@ -2997,7 +2997,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
29972997
elog(WARNING, "Process %d starts recovery from node %d", MyProcPid, nodeId);
29982998
Mtm->recoverySlot = nodeId;
29992999
Mtm->nReceivers = 0;
3000-
Mtm->nSenders = 0;
30013000
Mtm->recoveryCount += 1;
30023001
Mtm->pglogicalNodeMask = 0;
30033002
MtmUnlock();
@@ -3077,19 +3076,6 @@ MtmOnProcExit(int code, Datum arg)
30773076
}
30783077
}
30793078

3080-
static void
3081-
MtmReplicationStartedHook(struct PGLogicalStartedHookArgs* args)
3082-
{
3083-
MtmLock(LW_EXCLUSIVE);
3084-
MtmSenderStarted = 1;
3085-
elog(LOG, "Start %d senders and %d receivers from %d cluster status %s", Mtm->nSenders+1, Mtm->nReceivers, Mtm->nLiveNodes-1, MtmNodeStatusMnem[Mtm->status]);
3086-
if (++Mtm->nSenders == Mtm->nLiveNodes-1 && Mtm->nReceivers == Mtm->nLiveNodes-1 && Mtm->status == MTM_CONNECTED) {
3087-
MtmSwitchClusterMode(MTM_ONLINE);
3088-
}
3089-
MtmUnlock();
3090-
}
3091-
3092-
30933079
static void
30943080
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
30953081
{
@@ -3206,9 +3192,6 @@ static void
32063192
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
32073193
{
32083194
if (MtmReplicationNodeId >= 0) {
3209-
MtmLock(LW_EXCLUSIVE);
3210-
Mtm->nSenders -= MtmSenderStarted;
3211-
MtmUnlock();
32123195
MTM_LOG1("Logical replication to node %d is stopped", MtmReplicationNodeId);
32133196
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
32143197
MtmReplicationNodeId = -1; /* defuse on_proc_exit hook */
@@ -3320,7 +3303,6 @@ bool MtmFilterTransaction(char* record, int size)
33203303
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
33213304
{
33223305
hooks->startup_hook = MtmReplicationStartupHook;
3323-
hooks->started_hook = MtmReplicationStartedHook;
33243306
hooks->shutdown_hook = MtmReplicationShutdownHook;
33253307
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
33263308
hooks->row_filter_hook = MtmReplicationRowFilterHook;

multimaster.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,8 +273,7 @@ typedef struct
273273
int inject2PCError; /* Simulate error during 2PC commit at this node */
274274
int nLiveNodes; /* Number of active nodes */
275275
int nAllNodes; /* Total numbber of nodes */
276-
int nReceivers; /* Number of initialized logical receivers (used to determine moment when intialization/recovery is completed) */
277-
int nSenders; /* Number of started WAL senders (used to determine moment when recovery) */
276+
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
278277
int nLockers; /* Number of lockers */
279278
int nActiveTransactions; /* Nunmber of active 2PC transactions */
280279
int nConfigChanges; /* Number of cluster configuration changes */

pglogical_config.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -477,8 +477,6 @@ prepare_startup_message(PGLogicalOutputData *data)
477477
*/
478478
l = add_startup_msg_b(l, "hooks.startup_hook_enabled",
479479
data->hooks.startup_hook != NULL);
480-
l = add_startup_msg_b(l, "hooks.started_hook_enabled",
481-
data->hooks.started_hook != NULL);
482480
l = add_startup_msg_b(l, "hooks.shutdown_hook_enabled",
483481
data->hooks.shutdown_hook != NULL);
484482
l = add_startup_msg_b(l, "hooks.row_filter_enabled",

pglogical_hooks.c

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,12 @@ load_hooks(PGLogicalOutputData *data)
9696

9797
elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
9898
"\tstartup_hook: %p\n"
99-
"\tstarted_hook: %p\n"
10099
"\tshutdown_hook: %p\n"
101100
"\trow_filter_hook: %p\n"
102101
"\ttxn_filter_hook: %p\n"
103102
"\thooks_private_data: %p\n",
104103
hooks_func,
105104
data->hooks.startup_hook,
106-
data->hooks.started_hook,
107105
data->hooks.shutdown_hook,
108106
data->hooks.row_filter_hook,
109107
data->hooks.txn_filter_hook,
@@ -120,21 +118,6 @@ load_hooks(PGLogicalOutputData *data)
120118
CommitTransactionCommand();
121119
}
122120

123-
void
124-
call_started_hook(PGLogicalOutputData *data)
125-
{
126-
struct PGLogicalStartedHookArgs args;
127-
MemoryContext old_ctxt;
128-
129-
if (data->hooks.started_hook != NULL)
130-
{
131-
args.private_data = data->hooks.hooks_private_data;
132-
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
133-
(void) (*data->hooks.started_hook)(&args);
134-
MemoryContextSwitchTo(old_ctxt);
135-
}
136-
}
137-
138121
void
139122
call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
140123
{

pglogical_hooks.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ extern void load_hooks(PGLogicalOutputData *data);
1111

1212
extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
1313

14-
extern void call_started_hook(PGLogicalOutputData *data);
15-
1614
extern void call_shutdown_hook(PGLogicalOutputData *data);
1715

1816
extern bool call_row_filter_hook(PGLogicalOutputData *data,

pglogical_output.c

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
5454
/* These must be available to pg_dlsym() */
5555
static void pg_decode_startup(LogicalDecodingContext * ctx,
5656
OutputPluginOptions *opt, bool is_init);
57-
static void pg_decode_started(LogicalDecodingContext * ctx);
5857
static void pg_decode_shutdown(LogicalDecodingContext * ctx);
5958
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
6059
ReorderBufferTXN *txn);
@@ -84,7 +83,6 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8483
AssertVariableIsOfType(&_PG_output_plugin_init, LogicalOutputPluginInit);
8584

8685
cb->startup_cb = pg_decode_startup;
87-
cb->started_cb = pg_decode_started;
8886
cb->begin_cb = pg_decode_begin_txn;
8987
cb->change_cb = pg_decode_change;
9088
cb->commit_cb = pg_decode_commit_txn;
@@ -492,15 +490,6 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
492490
MemoryContextReset(data->context);
493491
}
494492

495-
static void
496-
pg_decode_started(LogicalDecodingContext * ctx)
497-
{
498-
PGLogicalOutputData *data = ctx->output_plugin_private;
499-
call_started_hook(data);
500-
}
501-
502-
503-
504493
/*
505494
* Decide if the whole transaction with specific origin should be filtered out.
506495
*/

pglogical_output/hooks.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ struct PGLogicalStartupHookArgs
2727

2828
typedef void (*pglogical_startup_hook_fn)(struct PGLogicalStartupHookArgs *args);
2929

30-
struct PGLogicalStartedHookArgs
31-
{
32-
void *private_data;
33-
};
34-
35-
typedef void (*pglogical_started_hook_fn)(struct PGLogicalStartedHookArgs *args);
3630

3731
struct PGLogicalTxnFilterArgs
3832
{
@@ -69,7 +63,6 @@ typedef void (*pglogical_shutdown_hook_fn)(struct PGLogicalShutdownHookArgs *arg
6963
struct PGLogicalHooks
7064
{
7165
pglogical_startup_hook_fn startup_hook;
72-
pglogical_started_hook_fn started_hook;
7366
pglogical_shutdown_hook_fn shutdown_hook;
7467
pglogical_txn_filter_hook_fn txn_filter_hook;
7568
pglogical_row_filter_hook_fn row_filter_hook;

0 commit comments

Comments
 (0)