Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit db9f742

Browse files
author
Andrei Krichinin
committed
[PGPRO-5845] OutputPluginCallbacks::begin_prepare_cb
1 parent 77a38b2 commit db9f742

File tree

2 files changed

+46
-4
lines changed

2 files changed

+46
-4
lines changed

src/pglogical_output.c

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
6969
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
7070
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
7171

72+
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
7273
static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
7374
XLogRecPtr lsn);
7475
static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
@@ -135,6 +136,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
135136
cb->abort_cb = pg_decode_abort_txn;
136137

137138
cb->filter_prepare_cb = pg_filter_prepare;
139+
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
138140
cb->prepare_cb = pg_decode_prepare_txn;
139141
cb->commit_prepared_cb = pg_decode_commit_prepared_txn;
140142
cb->rollback_prepared_cb = pg_decode_abort_prepared_txn;
@@ -482,6 +484,46 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
482484
}
483485
}
484486

487+
void
488+
pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
489+
{
490+
PGLogicalOutputData *data = (PGLogicalOutputData *) ctx->output_plugin_private;
491+
bool send_replication_origin = data->forward_changeset_origins;
492+
493+
if (!startup_message_sent)
494+
send_startup_message(ctx, data, false /* can't be last message */ );
495+
496+
/* If the record didn't originate locally, send origin info */
497+
send_replication_origin &= txn->origin_id != InvalidRepOriginId;
498+
499+
if (data->api)
500+
{
501+
MtmOutputPluginPrepareWrite(ctx, !send_replication_origin, true);
502+
data->api->write_begin(ctx->out, data, txn);
503+
504+
if (send_replication_origin)
505+
{
506+
char *origin;
507+
508+
/* Message boundary */
509+
MtmOutputPluginWrite(ctx, false, false);
510+
MtmOutputPluginPrepareWrite(ctx, true, false);
511+
512+
/*
513+
* XXX: which behaviour we want here?
514+
*
515+
* Alternatives: - don't send origin message if origin name not
516+
* found (that's what we do now) - throw error - that will break
517+
* replication, not good - send some special "unknown" origin
518+
*/
519+
if (data->api->write_origin &&
520+
replorigin_by_oid(txn->origin_id, true, &origin))
521+
data->api->write_origin(ctx->out, origin, txn->origin_lsn);
522+
}
523+
MtmOutputPluginWrite(ctx, true, false);
524+
}
525+
}
526+
485527
void
486528
pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
487529
XLogRecPtr lsn)

src/syncpoint.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,20 +381,20 @@ GetRecoveryHorizon(int sender_node_id)
381381

382382
sql = psprintf("select mtm.get_recovery_horizon(%d)", sender_node_id);
383383
rc = SPI_execute(sql, true, 0);
384-
mtm_log(LOG, "----> GetRecoveryHorizont: sql '%s'", sql);
384+
mtm_log(LOG, "----> GetRecoveryHorizon: sql '%s'", sql);
385385

386386
if (rc != SPI_OK_SELECT && SPI_processed != 1)
387387
mtm_log(ERROR, "mtm.get_recovery_horizon failed: rc=%d, SPI_processed=" UINT64_FORMAT,
388388
rc, SPI_processed);
389389

390390
tup = SPI_tuptable->vals[0];
391-
mtm_log(LOG, "----> GetRecoveryHorizont: %d", SPI_tuptable->tupdesc->natts);
391+
mtm_log(LOG, "----> GetRecoveryHorizon: %d", SPI_tuptable->tupdesc->natts);
392392

393393
type = SPI_gettype(SPI_tuptable->tupdesc, 1);
394-
mtm_log(LOG, "----> GetRecoveryHorizont: type %s", type ? type: "NULL");
394+
mtm_log(LOG, "----> GetRecoveryHorizon: type %s", type ? type: "NULL");
395395

396396
value = SPI_getvalue(tup, SPI_tuptable->tupdesc, 1);
397-
mtm_log(LOG, "----> GetRecoveryHorizont: value %s", value ? value: "NULL");
397+
mtm_log(LOG, "----> GetRecoveryHorizon: value %s", value ? value: "NULL");
398398

399399
horizon = DatumGetLSN(SPI_getbinval(tup, SPI_tuptable->tupdesc, 1, &isnull));
400400
if (isnull)

0 commit comments

Comments
 (0)