Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 63cf61c

Browse files
author
Amit Kapila
committed
Add prepare API support for streaming transactions in logical replication.
Commit a8fd13c added support for prepared transactions to built-in logical replication via a new option "two_phase" for a subscription. The "two_phase" option was not allowed with the existing streaming option. This commit permits the combination of "streaming" and "two_phase" subscription options. It extends the pgoutput plugin and the subscriber side code to add the prepare API for streaming transactions which will apply the changes accumulated in the spool-file at prepare time. Author: Peter Smith and Ajin Cherian Reviewed-by: Vignesh C, Amit Kapila, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
1 parent 6424337 commit 63cf61c

File tree

12 files changed

+667
-79
lines changed

12 files changed

+667
-79
lines changed

doc/src/sgml/logicaldecoding.sgml

+10-1
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,9 @@ OutputPluginWrite(ctx, true);
11991199
<function>stream_abort_cb</function>, <function>stream_commit_cb</function>
12001200
and <function>stream_change_cb</function>) and two optional callbacks
12011201
(<function>stream_message_cb</function> and <function>stream_truncate_cb</function>).
1202+
Also, if streaming of two-phase commands is to be supported, then additional
1203+
callbacks must be provided. (See <xref linkend="logicaldecoding-two-phase-commits"/>
1204+
for details).
12021205
</para>
12031206

12041207
<para>
@@ -1237,7 +1240,13 @@ stream_start_cb(...); &lt;-- start of second block of changes
12371240
stream_change_cb(...);
12381241
stream_stop_cb(...); &lt;-- end of second block of changes
12391242

1240-
stream_commit_cb(...); &lt;-- commit of the streamed transaction
1243+
1244+
[a. when using normal commit]
1245+
stream_commit_cb(...); &lt;-- commit of the streamed transaction
1246+
1247+
[b. when using two-phase commit]
1248+
stream_prepare_cb(...); &lt;-- prepare the streamed transaction
1249+
commit_prepared_cb(...); &lt;-- commit of the prepared transaction
12411250
</programlisting>
12421251
</para>
12431252

doc/src/sgml/protocol.sgml

+75-1
Original file line numberDiff line numberDiff line change
@@ -7411,7 +7411,7 @@ Stream Abort
74117411
</variablelist>
74127412

74137413
<para>
7414-
The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared)
7414+
The following messages (Begin Prepare, Prepare, Commit Prepared, Rollback Prepared, Stream Prepare)
74157415
are available since protocol version 3.
74167416
</para>
74177417

@@ -7714,6 +7714,80 @@ are available since protocol version 3.
77147714
</listitem>
77157715
</varlistentry>
77167716

7717+
<varlistentry>
7718+
7719+
<term>Stream Prepare</term>
7720+
<listitem>
7721+
<para>
7722+
7723+
<variablelist>
7724+
7725+
<varlistentry>
7726+
<term>Byte1('p')</term>
7727+
<listitem><para>
7728+
Identifies the message as a two-phase stream prepare message.
7729+
</para></listitem>
7730+
</varlistentry>
7731+
7732+
<varlistentry>
7733+
<term>
7734+
Int8(0)
7735+
</term>
7736+
<listitem><para>
7737+
Flags; currently unused.
7738+
</para></listitem>
7739+
</varlistentry>
7740+
7741+
<varlistentry>
7742+
<term>
7743+
Int64 (XLogRecPtr)
7744+
</term>
7745+
<listitem><para>
7746+
The LSN of the prepare.
7747+
</para></listitem>
7748+
</varlistentry>
7749+
7750+
<varlistentry>
7751+
<term>
7752+
Int64 (XLogRecPtr)
7753+
</term>
7754+
<listitem><para>
7755+
The end LSN of the prepare transaction.
7756+
</para></listitem>
7757+
</varlistentry>
7758+
7759+
<varlistentry>
7760+
<term>
7761+
Int64 (TimestampTz)
7762+
</term>
7763+
<listitem><para>
7764+
Prepare timestamp of the transaction. The value is in number
7765+
of microseconds since PostgreSQL epoch (2000-01-01).
7766+
</para></listitem>
7767+
</varlistentry>
7768+
7769+
<varlistentry>
7770+
<term>
7771+
Int32 (TransactionId)
7772+
</term>
7773+
<listitem><para>
7774+
Xid of the transaction.
7775+
</para></listitem>
7776+
</varlistentry>
7777+
7778+
<varlistentry>
7779+
<term>String</term>
7780+
<listitem><para>
7781+
The user defined GID of the two-phase transaction.
7782+
</para></listitem>
7783+
</varlistentry>
7784+
7785+
</variablelist>
7786+
7787+
</para>
7788+
</listitem>
7789+
</varlistentry>
7790+
77177791
</variablelist>
77187792

77197793
<para>

doc/src/sgml/ref/create_subscription.sgml

-10
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
238238
subscriber as a whole.
239239
</para>
240240

241-
<para>
242-
The <literal>streaming</literal> option cannot be used with the
243-
<literal>two_phase</literal> option.
244-
</para>
245-
246241
</listitem>
247242
</varlistentry>
248243
<varlistentry>
@@ -269,11 +264,6 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
269264
to know the actual two-phase state.
270265
</para>
271266

272-
<para>
273-
The <literal>two_phase</literal> option cannot be used with the
274-
<literal>streaming</literal> option.
275-
</para>
276-
277267
</listitem>
278268
</varlistentry>
279269
</variablelist></para>

src/backend/commands/subscriptioncmds.c

-25
Original file line numberDiff line numberDiff line change
@@ -335,25 +335,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
335335
errmsg("subscription with %s must also set %s",
336336
"slot_name = NONE", "create_slot = false")));
337337
}
338-
339-
/*
340-
* Do additional checking for the disallowed combination of two_phase and
341-
* streaming. While streaming and two_phase can theoretically be
342-
* supported, it needs more analysis to allow them together.
343-
*/
344-
if (opts->twophase &&
345-
IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
346-
IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
347-
{
348-
if (opts->streaming &&
349-
IsSet(supported_opts, SUBOPT_STREAMING) &&
350-
IsSet(opts->specified_opts, SUBOPT_STREAMING))
351-
ereport(ERROR,
352-
(errcode(ERRCODE_SYNTAX_ERROR),
353-
/*- translator: both %s are strings of the form "option = value" */
354-
errmsg("%s and %s are mutually exclusive options",
355-
"two_phase = true", "streaming = true")));
356-
}
357338
}
358339

359340
/*
@@ -933,12 +914,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
933914

934915
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
935916
{
936-
if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming)
937-
ereport(ERROR,
938-
(errcode(ERRCODE_SYNTAX_ERROR),
939-
errmsg("cannot set %s for two-phase enabled subscription",
940-
"streaming = true")));
941-
942917
values[Anum_pg_subscription_substream - 1] =
943918
BoolGetDatum(opts.streaming);
944919
replaces[Anum_pg_subscription_substream - 1] = true;

src/backend/replication/logical/proto.c

+27-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,8 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
145145
}
146146

147147
/*
148-
* The core functionality for logicalrep_write_prepare.
148+
* The core functionality for logicalrep_write_prepare and
149+
* logicalrep_write_stream_prepare.
149150
*/
150151
static void
151152
logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
@@ -188,7 +189,8 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
188189
}
189190

190191
/*
191-
* The core functionality for logicalrep_read_prepare.
192+
* The core functionality for logicalrep_read_prepare and
193+
* logicalrep_read_stream_prepare.
192194
*/
193195
static void
194196
logicalrep_read_prepare_common(StringInfo in, char *msgtype,
@@ -209,6 +211,8 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype,
209211
elog(ERROR, "end_lsn is not set in %s message", msgtype);
210212
prepare_data->prepare_time = pq_getmsgint64(in);
211213
prepare_data->xid = pq_getmsgint(in, 4);
214+
if (prepare_data->xid == InvalidTransactionId)
215+
elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
212216

213217
/* read gid (copy it into a pre-allocated buffer) */
214218
strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
@@ -339,6 +343,27 @@ logicalrep_read_rollback_prepared(StringInfo in,
339343
strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
340344
}
341345

346+
/*
347+
* Write STREAM PREPARE to the output stream.
348+
*/
349+
void
350+
logicalrep_write_stream_prepare(StringInfo out,
351+
ReorderBufferTXN *txn,
352+
XLogRecPtr prepare_lsn)
353+
{
354+
logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
355+
txn, prepare_lsn);
356+
}
357+
358+
/*
359+
* Read STREAM PREPARE from the stream.
360+
*/
361+
void
362+
logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
363+
{
364+
logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
365+
}
366+
342367
/*
343368
* Write ORIGIN to the output stream.
344369
*/

src/backend/replication/logical/worker.c

+55-1
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,56 @@ apply_handle_rollback_prepared(StringInfo s)
10521052
pgstat_report_activity(STATE_IDLE, NULL);
10531053
}
10541054

1055+
/*
1056+
* Handle STREAM PREPARE.
1057+
*
1058+
* Logic is in two parts:
1059+
* 1. Replay all the spooled operations
1060+
* 2. Mark the transaction as prepared
1061+
*/
1062+
static void
1063+
apply_handle_stream_prepare(StringInfo s)
1064+
{
1065+
LogicalRepPreparedTxnData prepare_data;
1066+
1067+
if (in_streamed_transaction)
1068+
ereport(ERROR,
1069+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1070+
errmsg_internal("STREAM PREPARE message without STREAM STOP")));
1071+
1072+
/* Tablesync should never receive prepare. */
1073+
if (am_tablesync_worker())
1074+
ereport(ERROR,
1075+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1076+
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
1077+
1078+
logicalrep_read_stream_prepare(s, &prepare_data);
1079+
1080+
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
1081+
1082+
/* Replay all the spooled operations. */
1083+
apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
1084+
1085+
/* Mark the transaction as prepared. */
1086+
apply_handle_prepare_internal(&prepare_data);
1087+
1088+
CommitTransactionCommand();
1089+
1090+
pgstat_report_stat(false);
1091+
1092+
store_flush_position(prepare_data.end_lsn);
1093+
1094+
in_remote_transaction = false;
1095+
1096+
/* unlink the files with serialized changes and subxact info. */
1097+
stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
1098+
1099+
/* Process any tables that are being synchronized in parallel. */
1100+
process_syncing_tables(prepare_data.end_lsn);
1101+
1102+
pgstat_report_activity(STATE_IDLE, NULL);
1103+
}
1104+
10551105
/*
10561106
* Handle ORIGIN message.
10571107
*
@@ -1291,7 +1341,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
12911341
*/
12921342
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
12931343

1294-
/* open the spool file for the committed transaction */
1344+
/* Open the spool file for the committed/prepared transaction */
12951345
changes_filename(path, MyLogicalRepWorker->subid, xid);
12961346
elog(DEBUG1, "replaying changes from file \"%s\"", path);
12971347

@@ -2357,6 +2407,10 @@ apply_dispatch(StringInfo s)
23572407
case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
23582408
apply_handle_rollback_prepared(s);
23592409
return;
2410+
2411+
case LOGICAL_REP_MSG_STREAM_PREPARE:
2412+
apply_handle_stream_prepare(s);
2413+
return;
23602414
}
23612415

23622416
ereport(ERROR,

src/backend/replication/pgoutput/pgoutput.c

+21-12
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
7171
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
7272
ReorderBufferTXN *txn,
7373
XLogRecPtr commit_lsn);
74+
static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
75+
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
7476

7577
static bool publications_valid;
7678
static bool in_streaming;
@@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
175177
cb->stream_message_cb = pgoutput_message;
176178
cb->stream_truncate_cb = pgoutput_truncate;
177179
/* transaction streaming - two-phase commit */
178-
cb->stream_prepare_cb = NULL;
180+
cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
179181
}
180182

181183
static void
@@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data)
280282
}
281283
else
282284
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
283-
284-
/*
285-
* Do additional checking for the disallowed combination of two_phase
286-
* and streaming. While streaming and two_phase can theoretically be
287-
* supported, it needs more analysis to allow them together.
288-
*/
289-
if (data->two_phase && data->streaming)
290-
ereport(ERROR,
291-
(errcode(ERRCODE_SYNTAX_ERROR),
292-
errmsg("%s and %s are mutually exclusive options",
293-
"two_phase", "streaming")));
294285
}
295286
}
296287

@@ -1029,6 +1020,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
10291020
cleanup_rel_sync_cache(txn->xid, true);
10301021
}
10311022

1023+
/*
1024+
* PREPARE callback (for streaming two-phase commit).
1025+
*
1026+
* Notify the downstream to prepare the transaction.
1027+
*/
1028+
static void
1029+
pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
1030+
ReorderBufferTXN *txn,
1031+
XLogRecPtr prepare_lsn)
1032+
{
1033+
Assert(rbtxn_is_streamed(txn));
1034+
1035+
OutputPluginUpdateProgress(ctx);
1036+
OutputPluginPrepareWrite(ctx, true);
1037+
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
1038+
OutputPluginWrite(ctx, true);
1039+
}
1040+
10321041
/*
10331042
* Initialize the relation schema sync cache for a decoding session.
10341043
*

src/include/replication/logicalproto.h

+6-2
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ typedef enum LogicalRepMsgType
6767
LOGICAL_REP_MSG_STREAM_START = 'S',
6868
LOGICAL_REP_MSG_STREAM_END = 'E',
6969
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
70-
LOGICAL_REP_MSG_STREAM_ABORT = 'A'
70+
LOGICAL_REP_MSG_STREAM_ABORT = 'A',
71+
LOGICAL_REP_MSG_STREAM_PREPARE = 'p'
7172
} LogicalRepMsgType;
7273

7374
/*
@@ -196,7 +197,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN
196197
TimestampTz prepare_time);
197198
extern void logicalrep_read_rollback_prepared(StringInfo in,
198199
LogicalRepRollbackPreparedTxnData *rollback_data);
199-
200+
extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
201+
XLogRecPtr prepare_lsn);
202+
extern void logicalrep_read_stream_prepare(StringInfo in,
203+
LogicalRepPreparedTxnData *prepare_data);
200204

201205
extern void logicalrep_write_origin(StringInfo out, const char *origin,
202206
XLogRecPtr origin_lsn);

0 commit comments

Comments
 (0)