Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 32297b0

Browse files
knizhnikkelvich
authored andcommitted
Set logical decoding hooks
1 parent 0fcf740 commit 32297b0

6 files changed

+131
-75
lines changed

multimaster.c

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "nodes/makefuncs.h"
5454
#include "access/htup_details.h"
5555
#include "catalog/indexing.h"
56+
#include "pglogical_output/hooks.h"
5657

5758
#include "multimaster.h"
5859
#include "ddd.h"
@@ -161,6 +162,7 @@ bool MtmDoReplication;
161162
char* MtmDatabaseName;
162163

163164
int MtmNodeId;
165+
int MtmReplicationNodeId;
164166
int MtmArbiterPort;
165167
int MtmNodes;
166168
int MtmConnectAttempts;
@@ -1637,6 +1639,27 @@ void MtmDropNode(int nodeId, bool dropSlot)
16371639
}
16381640
}
16391641

1642+
static void
1643+
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
1644+
{
1645+
MtmOnNodeDisconnect(MtmReplicationNodeId);
1646+
}
1647+
1648+
static bool
1649+
MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1650+
{
1651+
elog(WARNING, "MtmReplicationTxnFilterHook: args->origin_id=%d, MtmReplicationNodeId=%d", args->origin_id, MtmReplicationNodeId);
1652+
return args->origin_id == InvalidRepOriginId || MtmIsRecoveredNode(MtmReplicationNodeId);
1653+
}
1654+
1655+
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
1656+
{
1657+
hooks->shutdown_hook = MtmReplicationShutdownHook;
1658+
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
1659+
}
1660+
1661+
1662+
16401663
/*
16411664
* -------------------------------------------
16421665
* SQL API functions
@@ -1988,16 +2011,42 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19882011
case T_ClosePortalStmt:
19892012
case T_FetchStmt:
19902013
case T_DoStmt:
2014+
case T_CreateTableSpaceStmt:
2015+
case T_DropTableSpaceStmt:
2016+
case T_AlterTableSpaceOptionsStmt:
2017+
case T_TruncateStmt:
2018+
case T_CommentStmt: /* XXX: we could replicate these */;
19912019
case T_CopyStmt:
19922020
case T_PrepareStmt:
19932021
case T_ExecuteStmt:
2022+
case T_DeallocateStmt:
2023+
case T_GrantStmt: /* XXX: we could replicate some of these these */;
2024+
case T_GrantRoleStmt:
2025+
case T_AlterDatabaseStmt:
2026+
case T_AlterDatabaseSetStmt:
19942027
case T_NotifyStmt:
19952028
case T_ListenStmt:
19962029
case T_UnlistenStmt:
19972030
case T_LoadStmt:
2031+
case T_ClusterStmt: /* XXX: we could replicate these */;
2032+
case T_VacuumStmt:
2033+
case T_ExplainStmt:
2034+
case T_AlterSystemStmt:
19982035
case T_VariableSetStmt:
19992036
case T_VariableShowStmt:
2000-
skipCommand = true;
2037+
case T_DiscardStmt:
2038+
case T_CreateEventTrigStmt:
2039+
case T_AlterEventTrigStmt:
2040+
case T_CreateRoleStmt:
2041+
case T_AlterRoleStmt:
2042+
case T_AlterRoleSetStmt:
2043+
case T_DropRoleStmt:
2044+
case T_ReassignOwnedStmt:
2045+
case T_LockStmt:
2046+
case T_ConstraintsSetStmt:
2047+
case T_CheckPointStmt:
2048+
case T_ReindexStmt:
2049+
skipCommand = true;
20012050
break;
20022051
default:
20032052
skipCommand = false;

multimaster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
#include "bgwpool.h"
66
#include "bkb.h"
77

8+
#include "pglogical_output/hooks.h"
9+
810
#define MTM_TUPLE_TRACE(fmt, ...)
911
/*
1012
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -149,6 +151,7 @@ extern char const* const MtmNodeStatusMnem[];
149151
extern MtmState* Mtm;
150152

151153
extern int MtmNodeId;
154+
extern int MtmReplicationNodeId;
152155
extern int MtmNodes;
153156
extern int MtmArbiterPort;
154157
extern char* MtmDatabaseName;
@@ -192,5 +195,5 @@ extern bool MtmIsRecoveredNode(int nodeId);
192195
extern void MtmRefreshClusterStatus(bool nowait);
193196
extern void MtmSwitchClusterMode(MtmNodeStatus mode);
194197
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
195-
198+
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
196199
#endif

pglogical_hooks.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ load_hooks(PGLogicalOutputData *data)
106106
data->hooks.row_filter_hook,
107107
data->hooks.txn_filter_hook,
108108
data->hooks.hooks_private_data);
109+
}
110+
else if (data->api->setup_hooks)
111+
{
112+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
113+
(*data->api->setup_hooks)(&data->hooks);
114+
MemoryContextSwitchTo(old_ctxt);
109115
}
110116

111117
if (txn_started)

pglogical_output.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
343343
data->forward_changeset_origins = false;
344344
}
345345

346-
if (data->hooks_setup_funcname != NIL)
346+
if (data->hooks_setup_funcname != NIL || data->api->setup_hooks)
347347
{
348348

349349
data->hooks_mctxt = AllocSetContextCreate(ctx->context,

pglogical_proto.c

Lines changed: 66 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,7 @@
3737

3838
#include "multimaster.h"
3939

40-
typedef struct PGLogicalProtoMM
41-
{
42-
PGLogicalProtoAPI api;
43-
int nodeId;
44-
bool isLocal;
45-
} PGLogicalProtoMM;
40+
static bool MtmIsFilteredTxn;
4641

4742
static void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel);
4843

@@ -72,30 +67,31 @@ static char decide_datum_transfer(Form_pg_attribute att,
7267
static void
7368
pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7469
{
75-
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
76-
if (!mm->isLocal) {
77-
const char *nspname;
78-
uint8 nspnamelen;
79-
const char *relname;
80-
uint8 relnamelen;
81-
82-
pq_sendbyte(out, 'R'); /* sending RELATION */
83-
84-
nspname = get_namespace_name(rel->rd_rel->relnamespace);
85-
if (nspname == NULL)
86-
elog(ERROR, "cache lookup failed for namespace %u",
87-
rel->rd_rel->relnamespace);
88-
nspnamelen = strlen(nspname) + 1;
89-
90-
relname = NameStr(rel->rd_rel->relname);
91-
relnamelen = strlen(relname) + 1;
92-
93-
pq_sendbyte(out, nspnamelen); /* schema name length */
94-
pq_sendbytes(out, nspname, nspnamelen);
95-
96-
pq_sendbyte(out, relnamelen); /* table name length */
97-
pq_sendbytes(out, relname, relnamelen);
98-
}
70+
const char *nspname;
71+
uint8 nspnamelen;
72+
const char *relname;
73+
uint8 relnamelen;
74+
75+
if (MtmIsFilteredTxn) {
76+
return;
77+
}
78+
79+
pq_sendbyte(out, 'R'); /* sending RELATION */
80+
81+
nspname = get_namespace_name(rel->rd_rel->relnamespace);
82+
if (nspname == NULL)
83+
elog(ERROR, "cache lookup failed for namespace %u",
84+
rel->rd_rel->relnamespace);
85+
nspnamelen = strlen(nspname) + 1;
86+
87+
relname = NameStr(rel->rd_rel->relname);
88+
relnamelen = strlen(relname) + 1;
89+
90+
pq_sendbyte(out, nspnamelen); /* schema name length */
91+
pq_sendbytes(out, nspname, nspnamelen);
92+
93+
pq_sendbyte(out, relnamelen); /* table name length */
94+
pq_sendbytes(out, relname, relnamelen);
9995
}
10096

10197
/*
@@ -105,21 +101,19 @@ static void
105101
pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
106102
ReorderBufferTXN *txn)
107103
{
108-
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
104+
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
109105
csn_t csn = MtmTransactionSnapshot(txn->xid);
110-
bool isRecovery = MtmIsRecoveredNode(mm->nodeId);
111106
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n", txn->xid, csn);
112-
if (csn == INVALID_CSN && !isRecovery) {
113-
//Assert(txn->origin_id != InvalidRepOriginId);
114-
mm->isLocal = true;
115-
} else {
116-
mm->isLocal = false;
117-
//Assert(txn->origin_id == InvalidRepOriginId || isRecovery);
118-
pq_sendbyte(out, 'B'); /* BEGIN */
107+
108+
if (csn == INVALID_CSN && !isRecovery) {
109+
MtmIsFilteredTxn = true;
110+
} else {
111+
pq_sendbyte(out, 'B'); /* BEGIN */
119112
pq_sendint(out, MtmNodeId, 4);
120113
pq_sendint(out, isRecovery ? InvalidTransactionId : txn->xid, 4);
121-
pq_sendint64(out, csn);
122-
}
114+
pq_sendint64(out, csn);
115+
MtmIsFilteredTxn = false;
116+
}
123117
}
124118

125119
/*
@@ -129,9 +123,11 @@ static void
129123
pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
130124
ReorderBufferTXN *txn, XLogRecPtr commit_lsn)
131125
{
132-
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
133126
uint8 flags = 0;
134127

128+
if (MtmIsFilteredTxn) {
129+
return;
130+
}
135131
if (txn->xact_action == XLOG_XACT_COMMIT)
136132
flags = PGLOGICAL_COMMIT;
137133
else if (txn->xact_action == XLOG_XACT_PREPARE)
@@ -143,18 +139,19 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
143139
else
144140
Assert(false);
145141

146-
142+
#if 0
147143
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
148144
if (mm->isLocal) {
149145
return;
150146
}
151147
} else {
152148
csn_t csn = MtmTransactionSnapshot(txn->xid);
153-
bool isRecovery = MtmIsRecoveredNode(mm->nodeId);
149+
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
154150
if (csn == INVALID_CSN && !isRecovery) {
155151
return;
156152
}
157153
}
154+
#endif
158155

159156
pq_sendbyte(out, 'C'); /* sending COMMIT */
160157

@@ -185,11 +182,10 @@ static void
185182
pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
186183
Relation rel, HeapTuple newtuple)
187184
{
188-
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
189-
if (!mm->isLocal) {
190-
pq_sendbyte(out, 'I'); /* action INSERT */
191-
pglogical_write_tuple(out, data, rel, newtuple);
192-
}
185+
if (!MtmIsFilteredTxn) {
186+
pq_sendbyte(out, 'I'); /* action INSERT */
187+
pglogical_write_tuple(out, data, rel, newtuple);
188+
}
193189
}
194190

195191
/*
@@ -199,32 +195,31 @@ static void
199195
pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
200196
Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
201197
{
202-
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
203-
if (!mm->isLocal) {
204-
pq_sendbyte(out, 'U'); /* action UPDATE */
205-
/* FIXME support whole tuple (O tuple type) */
206-
if (oldtuple != NULL)
207-
{
208-
pq_sendbyte(out, 'K'); /* old key follows */
209-
pglogical_write_tuple(out, data, rel, oldtuple);
210-
}
211-
212-
pq_sendbyte(out, 'N'); /* new tuple follows */
213-
pglogical_write_tuple(out, data, rel, newtuple);
214-
}
198+
if (!MtmIsFilteredTxn) {
199+
pq_sendbyte(out, 'U'); /* action UPDATE */
200+
/* FIXME support whole tuple (O tuple type) */
201+
if (oldtuple != NULL)
202+
{
203+
pq_sendbyte(out, 'K'); /* old key follows */
204+
pglogical_write_tuple(out, data, rel, oldtuple);
205+
}
206+
207+
pq_sendbyte(out, 'N'); /* new tuple follows */
208+
pglogical_write_tuple(out, data, rel, newtuple);
209+
}
215210
}
211+
216212
/*
217213
* Write DELETE to the output stream.
218214
*/
219215
static void
220216
pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
221217
Relation rel, HeapTuple oldtuple)
222218
{
223-
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
224-
if (!mm->isLocal) {
225-
pq_sendbyte(out, 'D'); /* action DELETE */
226-
pglogical_write_tuple(out, data, rel, oldtuple);
227-
}
219+
if (!MtmIsFilteredTxn) {
220+
pq_sendbyte(out, 'D'); /* action DELETE */
221+
pglogical_write_tuple(out, data, rel, oldtuple);
222+
}
228223
}
229224

230225
/*
@@ -422,16 +417,16 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
422417
PGLogicalProtoAPI *
423418
pglogical_init_api(PGLogicalProtoType typ)
424419
{
425-
PGLogicalProtoMM* pmm = palloc0(sizeof(PGLogicalProtoMM));
426-
PGLogicalProtoAPI* res = &pmm->api;
427-
pmm->isLocal = false;
428-
sscanf(MyReplicationSlot->data.name.data, MULTIMASTER_SLOT_PATTERN, &pmm->nodeId);
420+
PGLogicalProtoAPI* res = palloc0(sizeof(PGLogicalProtoAPI));
421+
sscanf(MyReplicationSlot->data.name.data, MULTIMASTER_SLOT_PATTERN, &MtmReplicationNodeId);
422+
elog(WARNING, "%d: PRGLOGICAL init API for slot %s node %d", MyProcPid, MyReplicationSlot->data.name.data, MtmReplicationNodeId);
429423
res->write_rel = pglogical_write_rel;
430424
res->write_begin = pglogical_write_begin;
431425
res->write_commit = pglogical_write_commit;
432426
res->write_insert = pglogical_write_insert;
433427
res->write_update = pglogical_write_update;
434428
res->write_delete = pglogical_write_delete;
429+
res->setup_hooks = MtmSetupReplicationHooks;
435430
res->write_startup_message = write_startup_message;
436431
return res;
437432
}

pglogical_proto.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ typedef void (*pglogical_write_delete_fn)(StringInfo out, PGLogicalOutputData *d
3333

3434
typedef void (*write_startup_message_fn)(StringInfo out, List *msg);
3535

36+
typedef void (*pglogical_setup_hooks_fn)(struct PGLogicalHooks* hooks);
37+
3638
typedef struct PGLogicalProtoAPI
3739
{
3840
pglogical_write_rel_fn write_rel;
@@ -42,7 +44,8 @@ typedef struct PGLogicalProtoAPI
4244
pglogical_write_insert_fn write_insert;
4345
pglogical_write_update_fn write_update;
4446
pglogical_write_delete_fn write_delete;
45-
write_startup_message_fn write_startup_message;
47+
pglogical_setup_hooks_fn setup_hooks;
48+
write_startup_message_fn write_startup_message;
4649
} PGLogicalProtoAPI;
4750

4851

0 commit comments

Comments
 (0)