/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}