Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit b1a45d8

Browse files
committed
Use new twophase decoding API in pglogical_proto
1 parent d5d0986 commit b1a45d8

File tree

3 files changed

+375
-99
lines changed

3 files changed

+375
-99
lines changed

pglogical_output.c

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,23 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
6161
ReorderBufferTXN *txn);
6262
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
6363
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
64+
65+
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
66+
XLogRecPtr lsn);
67+
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
68+
XLogRecPtr lsn);
69+
static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
70+
XLogRecPtr lsn);
71+
72+
static bool pg_filter_decode_txn(LogicalDecodingContext *ctx,
73+
ReorderBufferTXN *txn);
74+
75+
static bool pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
76+
TransactionId xid, const char *gid);
77+
78+
static void pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
79+
XLogRecPtr abort_lsn);
80+
6481
static void pg_decode_change(LogicalDecodingContext *ctx,
6582
ReorderBufferTXN *txn, Relation rel,
6683
ReorderBufferChange *change);
@@ -109,6 +126,14 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
109126
cb->begin_cb = pg_decode_begin_txn;
110127
cb->change_cb = pg_decode_change;
111128
cb->commit_cb = pg_decode_commit_txn;
129+
cb->abort_cb = pg_decode_abort_txn;
130+
131+
cb->filter_prepare_cb = pg_filter_prepare;
132+
cb->filter_decode_txn_cb = pg_filter_decode_txn;
133+
cb->prepare_cb = pg_decode_prepare_txn;
134+
cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
135+
cb->abort_prepared_cb = pg_decode_abort_prepared_txn;
136+
112137
cb->filter_by_origin_cb = pg_decode_origin_filter;
113138
cb->shutdown_cb = pg_decode_shutdown;
114139
cb->message_cb = pg_decode_message;
@@ -475,6 +500,39 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
475500
}
476501
}
477502

503+
void
504+
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
505+
XLogRecPtr lsn)
506+
{
507+
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
508+
509+
MtmOutputPluginPrepareWrite(ctx, true, true);
510+
pglogical_write_prepare(ctx->out, data, txn, lsn);
511+
MtmOutputPluginWrite(ctx, true, true);
512+
}
513+
514+
void
515+
pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
516+
XLogRecPtr lsn)
517+
{
518+
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
519+
520+
MtmOutputPluginPrepareWrite(ctx, true, true);
521+
pglogical_write_commit_prepared(ctx->out, data, txn, lsn);
522+
MtmOutputPluginWrite(ctx, true, true);
523+
}
524+
525+
void
526+
pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
527+
XLogRecPtr lsn)
528+
{
529+
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
530+
531+
MtmOutputPluginPrepareWrite(ctx, true, true);
532+
pglogical_write_abort_prepared(ctx->out, data, txn, lsn);
533+
MtmOutputPluginWrite(ctx, true, true);
534+
}
535+
478536
void
479537
pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
480538
Relation relation, ReorderBufferChange *change)
@@ -611,3 +669,57 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
611669
data->hooks_mctxt = NULL;
612670
}
613671
}
672+
673+
/* Filter out unnecessary two-phase transactions */
674+
static bool
675+
pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
676+
TransactionId xid, const char *gid)
677+
{
678+
// PGLogicalOutputData *data = ctx->output_plugin_private;
679+
680+
/*
681+
* decode all 2PC transactions
682+
*/
683+
return false;
684+
}
685+
686+
/*
687+
* Check if we should continue to decode this transaction.
688+
*
689+
* If it has aborted in the meanwhile, then there's no sense
690+
* in decoding and sending the rest of the changes, we might
691+
* as well ask the subscribers to abort immediately.
692+
*
693+
* This should be called if we are streaming a transaction
694+
* before it's committed or if we are decoding a 2PC
695+
* transaction. Otherwise we always decode committed
696+
* transactions
697+
*
698+
* Additional checks can be added here, as needed
699+
*/
700+
static bool
701+
pg_filter_decode_txn(LogicalDecodingContext *ctx,
702+
ReorderBufferTXN *txn)
703+
{
704+
/*
705+
* Due to caching, repeated TransactionIdDidAbort calls
706+
* shouldn't be that expensive
707+
*/
708+
if (txn != NULL &&
709+
TransactionIdIsValid(txn->xid) &&
710+
TransactionIdDidAbort(txn->xid))
711+
return true;
712+
713+
/* if txn is NULL, filter it out */
714+
return (txn != NULL)? false:true;
715+
}
716+
717+
/* ABORT callback */
718+
static void
719+
pg_decode_abort_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
720+
XLogRecPtr abort_lsn)
721+
{
722+
// PGLogicalOutputData *data = ctx->output_plugin_private;
723+
724+
MTM_LOG1("pg_decode_abort_txn called for " XID_FMT, txn->xid);
725+
}

0 commit comments

Comments
 (0)