Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 61c71a4

Browse files
committed
Trace logical decoding
1 parent 64911de commit 61c71a4

File tree

7 files changed

+69
-21
lines changed

7 files changed

+69
-21
lines changed

contrib/mmts/arbiter.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,15 @@ static char const* const messageText[] =
129129

130130
static BackgroundWorker MtmSender = {
131131
"mtm-sender",
132-
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
132+
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
133133
BgWorkerStart_ConsistentState,
134134
MULTIMASTER_BGW_RESTART_TIMEOUT,
135135
MtmTransSender
136136
};
137137

138138
static BackgroundWorker MtmRecevier = {
139139
"mtm-receiver",
140-
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
140+
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
141141
BgWorkerStart_ConsistentState,
142142
MULTIMASTER_BGW_RESTART_TIMEOUT,
143143
MtmTransReceiver

contrib/mmts/multimaster.c

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -981,7 +981,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
981981
/* Assert(ts->status == TRANSACTION_STATUS_UNKNOWN); */
982982
Assert(ts->status == TRANSACTION_STATUS_UNKNOWN
983983
|| (ts->status == TRANSACTION_STATUS_IN_PROGRESS && Mtm->status == MTM_RECOVERY)); /* ??? Why there is commit without prepare */
984-
if (x->csn > ts->csn) {
984+
if (x->csn > ts->csn || Mtm->status == MTM_RECOVERY) {
985985
ts->csn = x->csn;
986986
MtmSyncClock(ts->csn);
987987
}
@@ -1505,7 +1505,7 @@ bool MtmRefreshClusterStatus(bool nowait)
15051505
MtmWakeUpBackend(ts);
15061506
}
15071507
#if 0
1508-
} else if (TransactionIdIsValid(ts->gtid.xid) && BIT_CHECK(disabled, ts->gtid.node-1)) { // coordinator of transaction is on disabled node
1508+
} else if (TransactionIdIsValid(ts->gtid.xid) && BIT_CHECK(disabled, ts->gtid.node-1)) { /* coordinator of transaction is on disabled node */
15091509
if (ts->gid[0]) {
15101510
if (ts->status == TRANSACTION_STATUS_UNKNOWN || ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
15111511
MTM_LOG1("%d: Abort trasaction %s because its coordinator is at disabled node %d", MyProcPid, ts->gid, ts->gtid.node);
@@ -1595,12 +1595,14 @@ void MtmOnNodeDisconnect(int nodeId)
15951595
MtmAbortTransaction(ts);
15961596
MtmWakeUpBackend(ts);
15971597
}
1598-
} else if (TransactionIdIsValid(ts->gtid.xid) && ts->gtid.node == nodeId) { //coordinator of transaction is on disabled node
1598+
#if 0
1599+
} else if (TransactionIdIsValid(ts->gtid.xid) && ts->gtid.node == nodeId) { /* coordinator of transaction is on disabled node */
15991600
if (ts->gid[0] && ts->status != TRANSACTION_STATUS_ABORTED) {
16001601
MtmAbortTransaction(ts);
16011602
MtmTx.status = TRANSACTION_STATUS_ABORTED; /* prevent recursive invocation of MtmAbortPreparedTransaction */
16021603
FinishPreparedTransaction(ts->gid, false);
16031604
}
1605+
#endif
16041606
}
16051607
}
16061608
}
@@ -2213,11 +2215,11 @@ _PG_init(void)
22132215
NULL,
22142216
&MtmConnStrs,
22152217
"",
2216-
PGC_BACKEND, // context
2217-
0, // flags,
2218-
NULL, // GucStringCheckHook check_hook,
2219-
NULL, // GucStringAssignHook assign_hook,
2220-
NULL // GucShowHook show_hook
2218+
PGC_BACKEND, /* context */
2219+
0, /* flags */
2220+
NULL, /* GucStringCheckHook check_hook */
2221+
NULL, /* GucStringAssignHook assign_hook */
2222+
NULL /* GucShowHook show_hook */
22212223
);
22222224

22232225
DefineCustomIntVariable(
@@ -3249,15 +3251,19 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32493251
break;
32503252
case T_DiscardStmt:
32513253
{
3252-
//DiscardStmt *stmt = (DiscardStmt *) parsetree;
3253-
//skipCommand = stmt->target == DISCARD_TEMP;
3254+
/*
3255+
* DiscardStmt *stmt = (DiscardStmt *) parsetree;
3256+
* skipCommand = stmt->target == DISCARD_TEMP;
3257+
*/
32543258

32553259
skipCommand = true;
32563260

32573261
if (MtmGUCBufferAllocated)
32583262
{
3259-
// XXX: move allocation somewhere to backend startup and check
3260-
// where buffer is empty in send routines.
3263+
/*
3264+
* XXX: move allocation somewhere to backend startup and check
3265+
* where buffer is empty in send routines.
3266+
*/
32613267
MtmGUCBufferAllocated = false;
32623268
pfree(MtmGUCBuffer);
32633269
}
@@ -3286,7 +3292,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32863292

32873293
appendStringInfoString(MtmGUCBuffer, queryString);
32883294

3289-
// sometimes there is no ';' char at the end.
3295+
/* sometimes there is no ';' char at the end. */
32903296
appendStringInfoString(MtmGUCBuffer, ";");
32913297
}
32923298
break;

contrib/mmts/pglogical_apply.c

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ typedef struct TupleData
5959
bool changed[MaxTupleAttributeNumber];
6060
} TupleData;
6161

62+
static int MtmTransactionRecords;
63+
6264
static Relation read_rel(StringInfo s, LOCKMODE mode);
6365
static void read_tuple_parts(StringInfo s, Relation rel, TupleData *tup);
6466
static EState* create_rel_estate(Relation rel);
@@ -509,11 +511,16 @@ process_remote_commit(StringInfo in)
509511
csn_t csn;
510512
const char *gid = NULL;
511513
XLogRecPtr end_lsn;
512-
514+
int n_records;
513515
/* read flags */
514516
flags = pq_getmsgbyte(in);
515517
MtmReplicationNodeId = pq_getmsgbyte(in);
516518

519+
n_records = pq_getmsgint(in, 4);
520+
if (MtmTransactionRecords != n_records) {
521+
elog(ERROR, "Transaction %d flags %d contains %d records instead of %d", MtmGetCurrentTransactionId(), flags, MtmTransactionRecords, n_records);
522+
}
523+
517524
/* read fields */
518525
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
519526
end_lsn = pq_getmsgint64(in); /* end_lsn */
@@ -620,6 +627,8 @@ process_remote_insert(StringInfo s, Relation rel)
620627
char* relname = RelationGetRelationName(rel);
621628
int i;
622629

630+
MtmTransactionRecords += 1;
631+
623632
estate = create_rel_estate(rel);
624633
newslot = ExecInitExtraTupleSlot(estate);
625634
oldslot = ExecInitExtraTupleSlot(estate);
@@ -734,6 +743,8 @@ process_remote_update(StringInfo s, Relation rel)
734743
ScanKeyData skey[INDEX_MAX_KEYS];
735744
HeapTuple remote_tuple = NULL;
736745

746+
MtmTransactionRecords += 1;
747+
737748
action = pq_getmsgbyte(s);
738749

739750
/* old key present, identifying key changed */
@@ -851,6 +862,8 @@ process_remote_delete(StringInfo s, Relation rel)
851862
ScanKeyData skey[INDEX_MAX_KEYS];
852863
bool found_old;
853864

865+
MtmTransactionRecords += 1;
866+
854867
estate = create_rel_estate(rel);
855868
oldslot = ExecInitExtraTupleSlot(estate);
856869
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
@@ -938,6 +951,7 @@ void MtmExecutor(int id, void* work, size_t size)
938951
}
939952
MemoryContextSwitchTo(ApplyContext);
940953
replorigin_session_origin = InvalidRepOriginId;
954+
MtmTransactionRecords = 0;
941955
PG_TRY();
942956
{
943957
while (true) {

contrib/mmts/pglogical_proto.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "pglogical_relid_map.h"
4040

4141
static bool MtmIsFilteredTxn;
42+
static int MtmTransactionRecords;
4243

4344
static void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel);
4445

@@ -106,7 +107,8 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
106107
{
107108
bool isRecovery = MtmIsRecoveredNode(MtmReplicationNodeId);
108109
csn_t csn = MtmTransactionSnapshot(txn->xid);
109-
MTM_LOG2("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d", MyProcPid, txn->xid, MtmReplicationNodeId, csn, isRecovery);
110+
MTM_LOG1("%d: pglogical_write_begin XID=%d node=%d CSN=%ld recovery=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
111+
MyProcPid, txn->xid, MtmReplicationNodeId, csn, isRecovery, txn->restart_decoding_lsn, txn->first_lsn, txn->end_lsn, MyReplicationSlot->data.confirmed_flush);
110112

111113
if (csn == INVALID_CSN && !isRecovery) {
112114
MtmIsFilteredTxn = true;
@@ -116,6 +118,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
116118
pq_sendint(out, isRecovery ? InvalidTransactionId : txn->xid, 4);
117119
pq_sendint64(out, csn);
118120
MtmIsFilteredTxn = false;
121+
MtmTransactionRecords = 0;
119122
}
120123
}
121124

@@ -128,6 +131,10 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
128131
{
129132
uint8 flags = 0;
130133

134+
MTM_LOG1("%d: pglogical_write_commit XID=%d node=%d restart_decoding_lsn=%lx first_lsn=%lx end_lsn=%lx confirmed_flush=%lx",
135+
MyProcPid, txn->xid, MtmReplicationNodeId, txn->restart_decoding_lsn, txn->first_lsn, txn->end_lsn, MyReplicationSlot->data.confirmed_flush);
136+
137+
131138
if (txn->xact_action == XLOG_XACT_COMMIT)
132139
flags = PGLOGICAL_COMMIT;
133140
else if (txn->xact_action == XLOG_XACT_PREPARE)
@@ -141,6 +148,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
141148

142149
if (flags == PGLOGICAL_COMMIT || flags == PGLOGICAL_PREPARE) {
143150
if (MtmIsFilteredTxn) {
151+
Assert(MtmTransactionRecords == 0);
144152
return;
145153
}
146154
} else {
@@ -152,6 +160,7 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
152160
*/
153161
if (csn == INVALID_CSN && !isRecovery)
154162
{
163+
Assert(MtmTransactionRecords == 0);
155164
return;
156165
}
157166
if (MtmRecoveryCaughtUp(MtmReplicationNodeId, txn->end_lsn)) {
@@ -167,18 +176,23 @@ pglogical_write_commit(StringInfo out, PGLogicalOutputData *data,
167176
pq_sendbyte(out, flags);
168177
pq_sendbyte(out, MtmNodeId);
169178

179+
Assert(txn->xact_action != XLOG_XACT_PREPARE || txn->xid < 1000 || MtmTransactionRecords >= 2);
180+
pq_sendint(out, MtmTransactionRecords, 4);
181+
170182
/* send fixed fields */
171183
pq_sendint64(out, commit_lsn);
172184
pq_sendint64(out, txn->end_lsn);
173185
pq_sendint64(out, txn->commit_time);
174186

175187
if (txn->xact_action == XLOG_XACT_COMMIT_PREPARED) {
188+
Assert(MtmTransactionRecords == 0);
176189
pq_sendint64(out, MtmGetTransactionCSN(txn->xid));
177190
}
178191
if (txn->xact_action != XLOG_XACT_COMMIT) {
179192
pq_sendstring(out, txn->gid);
180193
}
181194

195+
MtmTransactionRecords = 0;
182196
MTM_TXTRACE(txn, "pglogical_write_commit Finish");
183197
}
184198

@@ -190,6 +204,7 @@ pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
190204
Relation rel, HeapTuple newtuple)
191205
{
192206
if (!MtmIsFilteredTxn) {
207+
MtmTransactionRecords += 1;
193208
pq_sendbyte(out, 'I'); /* action INSERT */
194209
pglogical_write_tuple(out, data, rel, newtuple);
195210
}
@@ -203,6 +218,11 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
203218
Relation rel, HeapTuple oldtuple, HeapTuple newtuple)
204219
{
205220
if (!MtmIsFilteredTxn) {
221+
MtmTransactionRecords += 1;
222+
223+
MTM_LOG1("%d: pglogical_write_update confirmed_flush=%lx", MyProcPid, MyReplicationSlot->data.confirmed_flush);
224+
225+
206226
pq_sendbyte(out, 'U'); /* action UPDATE */
207227
/* FIXME support whole tuple (O tuple type) */
208228
if (oldtuple != NULL)
@@ -224,6 +244,7 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
224244
Relation rel, HeapTuple oldtuple)
225245
{
226246
if (!MtmIsFilteredTxn) {
247+
MtmTransactionRecords += 1;
227248
pq_sendbyte(out, 'D'); /* action DELETE */
228249
pglogical_write_tuple(out, data, rel, oldtuple);
229250
}

contrib/mmts/tests2/docker-entrypoint.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ if [ "${1:0:1}" = '-' ]; then
66
set -- postgres "$@"
77
fi
88

9-
#echo "/pg/%p.%s.%c.%P.core" | sudo tee /proc/sys/kernel/core_pattern
9+
echo "/pg/%p.%s.%c.%P.core" | sudo tee /proc/sys/kernel/core_pattern
1010

1111
if [ "$1" = 'postgres' ]; then
1212
mkdir -p "$PGDATA"

contrib/mmts/tests2/test_recovery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def setUpClass(self):
1212
"dbname=postgres user=postgres host=127.0.0.1",
1313
"dbname=postgres user=postgres host=127.0.0.1 port=5433",
1414
"dbname=postgres user=postgres host=127.0.0.1 port=5434"
15-
], n_accounts=100000)
15+
], n_accounts=1000)
1616
self.client.bgrun()
1717
time.sleep(5)
1818

src/backend/replication/logical/decode.c

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <time.h>
2929

3030
#include "postgres.h"
31+
#include "miscadmin.h"
3132

3233
#include "access/heapam.h"
3334
#include "access/heapam_xlog.h"
@@ -423,6 +424,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
423424

424425
ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
425426

427+
elog(LOG, "%d: DecodeHeapOp XID=%d, info=%d", MyProcPid, xid, info);
428+
426429
/* no point in doing anything yet */
427430
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
428431
return;
@@ -813,16 +816,20 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
813816
char *data;
814817
RelFileNode target_node;
815818

819+
elog(LOG, "%d: DecodeUpdate XID=%d", MyProcPid);
820+
816821
xlrec = (xl_heap_update *) XLogRecGetData(r);
817822

818823
/* only interested in our database */
819824
XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
820-
if (target_node.dbNode != ctx->slot->data.database)
825+
if (target_node.dbNode != ctx->slot->data.database)
821826
return;
822827

823828
/* output plugin doesn't look for this origin, no need to queue */
824-
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
829+
if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) {
830+
elog(LOG, "%d: DecodeUpdate XID=%d filtered by origin %lx", MyProcPid, XLogRecGetOrigin(r));
825831
return;
832+
}
826833

827834
change = ReorderBufferGetChange(ctx->reorder);
828835
change->action = REORDER_BUFFER_CHANGE_UPDATE;

0 commit comments

Comments
 (0)