CommitTransactionCommand();
}
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+}
+
/* Common function to setup the leader apply or tablesync worker. */
void
SetupApplyOrSyncWorker(int worker_slot)
InitializeLogRepWorker();
+ /*
+ * Register a callback to reset the origin state before aborting any
+ * pending transaction during shutdown (see ShutdownPostgres()). This will
+ * avoid origin advancement for an in-complete transaction which could
+ * otherwise lead to its loss as such a transaction won't be sent by the
+ * server again.
+ *
+ * Note that even a LOG or DEBUG statement placed after setting the origin
+ * state may process a shutdown signal before committing the current apply
+ * operation. So, it is important to register such a callback here.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+ int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
+ elevel = geterrlevel();
+
+ /*
+ * Reset the origin state to prevent the advancement of origin progress if
+ * we fail to apply. Otherwise, this will result in transaction loss as
+ * that transaction won't be sent again by the server.
+ */
+ if (elevel >= ERROR)
+ replorigin_reset(0, (Datum) 0);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
return edata->sqlerrcode;
}
+/*
+ * geterrlevel --- return the currently set error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+ ErrorData *edata = &errordata[errordata_stack_depth];
+
+ /* we don't bother incrementing recursion_depth */
+ CHECK_STACK_DEPTH();
+
+ return edata->elevel;
+}
+
/*
* geterrposition --- return the currently set error position (0 if none)
*
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf('postgresql.conf',
- qq(max_prepared_transactions = 10));
+ qq(max_prepared_transactions = 0));
$node_subscriber->start;
# Create some pre-existing content on publisher
# then COMMIT PREPARED
###############################
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
$node_publisher->safe_psql(
'postgres', "
BEGIN;
INSERT INTO tab_full VALUES (11);
PREPARE TRANSACTION 'test_prepared_tab_full';");
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
$node_publisher->wait_for_catchup($appname);
# check that transaction is in prepared state on subscriber