Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 181d689

Browse files
knizhnikkelvich
authored andcommitted
PGPRO-731 # Table copy fixes
1 parent 432ef92 commit 181d689

7 files changed

+26
-71
lines changed

multimaster.c

+4-2
Original file line numberDiff line numberDiff line change
@@ -4422,7 +4422,8 @@ Datum mtm_broadcast_table(PG_FUNCTION_ARGS)
44224422
MtmCopyRequest copy;
44234423
copy.sourceTable = PG_GETARG_OID(0);
44244424
copy.targetNodes = PG_GETARG_INT64(1);
4425-
LogLogicalMessage("B", (char*)&copy, sizeof(copy), false);
4425+
LogLogicalMessage("B", (char*)&copy, sizeof(copy), true);
4426+
MtmTx.containsDML = true;
44264427
PG_RETURN_VOID();
44274428
}
44284429

@@ -4431,7 +4432,8 @@ Datum mtm_copy_table(PG_FUNCTION_ARGS)
44314432
MtmCopyRequest copy;
44324433
copy.sourceTable = PG_GETARG_OID(0);
44334434
copy.targetNodes = (nodemask_t)1 << (PG_GETARG_INT32(1) - 1);
4434-
LogLogicalMessage("B", (char*)&copy, sizeof(copy), false);
4435+
LogLogicalMessage("B", (char*)&copy, sizeof(copy), true);
4436+
MtmTx.containsDML = true;
44354437
PG_RETURN_VOID();
44364438
}
44374439

pglogical_apply.c

+3-49
Original file line numberDiff line numberDiff line change
@@ -368,49 +368,6 @@ process_remote_begin(StringInfo s)
368368
return true;
369369
}
370370

371-
static void
372-
process_broadcast_table(StringInfo s)
373-
{
374-
Relation rel;
375-
char ch;
376-
EState *estate;
377-
TupleData new_tuple;
378-
TupleTableSlot *newslot;
379-
TupleTableSlot *oldslot;
380-
HeapTuple tup;
381-
382-
StartTransactionCommand();
383-
384-
ch = pq_getmsgbyte(s);
385-
Assert(ch == 'R');
386-
rel = read_rel(s, AccessExclusiveLock);
387-
388-
heap_truncate_one_rel(rel);
389-
390-
estate = create_rel_estate(rel);
391-
newslot = ExecInitExtraTupleSlot(estate);
392-
oldslot = ExecInitExtraTupleSlot(estate);
393-
ExecSetSlotDescriptor(newslot, RelationGetDescr(rel));
394-
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
395-
396-
ExecOpenIndices(estate->es_result_relation_info, false);
397-
398-
while (s->cursor != s->len) {
399-
read_tuple_parts(s, rel, &new_tuple);
400-
tup = heap_form_tuple(RelationGetDescr(rel),
401-
new_tuple.values, new_tuple.isnull);
402-
ExecStoreTuple(tup, newslot, InvalidBuffer, true);
403-
simple_heap_insert(rel, newslot->tts_tuple);
404-
UserTableUpdateOpenIndexes(estate, newslot);
405-
}
406-
407-
ExecCloseIndices(estate->es_result_relation_info);
408-
ExecResetTupleTable(estate->es_tupleTable, true);
409-
FreeExecutorState(estate);
410-
411-
CommitTransactionCommand();
412-
}
413-
414371
static bool
415372
process_remote_message(StringInfo s)
416373
{
@@ -421,12 +378,6 @@ process_remote_message(StringInfo s)
421378

422379
switch (action)
423380
{
424-
case 'B':
425-
{
426-
process_broadcast_table(s);
427-
standalone = true;
428-
break;
429-
}
430381
case 'C':
431382
{
432383
MTM_LOG1("%d: Executing non-tx utility statement %s", MyProcPid, messageBody);
@@ -1206,6 +1157,9 @@ void MtmExecutor(void* work, size_t size)
12061157
s.len = save_len;
12071158
break;
12081159
}
1160+
case '0':
1161+
heap_truncate_one_rel(rel);
1162+
break;
12091163
case 'M':
12101164
{
12111165
close_rel(rel);

pglogical_output.c

+3-3
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ static bool startup_message_sent = false;
8181

8282
#define OUTPUT_BUFFER_SIZE (16*1024*1024)
8383

84-
static void MtmOutputPluginWrite(LogicalDecodingContext *ctx, bool last_write, bool flush)
84+
void MtmOutputPluginWrite(LogicalDecodingContext *ctx, bool last_write, bool flush)
8585
{
8686
if (flush) {
8787
OutputPluginWrite(ctx, last_write);
8888
}
8989
}
9090

91-
static void MtmOutputPluginPrepareWrite(LogicalDecodingContext *ctx, bool last_write, bool flush)
91+
void MtmOutputPluginPrepareWrite(LogicalDecodingContext *ctx, bool last_write, bool flush)
9292
{
9393
if (!ctx->prepared_write) {
9494
OutputPluginPrepareWrite(ctx, last_write);
@@ -557,7 +557,7 @@ pg_decode_message(LogicalDecodingContext *ctx,
557557
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
558558

559559
MtmOutputPluginPrepareWrite(ctx, true, !transactional);
560-
data->api->write_message(ctx->out, data, prefix, sz, message);
560+
data->api->write_message(ctx->out, ctx, prefix, sz, message);
561561
MtmOutputPluginWrite(ctx, true, !transactional);
562562
}
563563

pglogical_output.h

+3
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,7 @@ typedef struct PGLogicalTupleData
102102
bool changed[MaxTupleAttributeNumber];
103103
} PGLogicalTupleData;
104104

105+
extern void MtmOutputPluginWrite(LogicalDecodingContext *ctx, bool last_write, bool flush);
106+
extern void MtmOutputPluginPrepareWrite(LogicalDecodingContext *ctx, bool last_write, bool flush);
107+
105108
#endif /* PG_LOGICAL_OUTPUT_H */

pglogical_proto.c

+10-14
Original file line numberDiff line numberDiff line change
@@ -168,38 +168,34 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168168
}
169169
}
170170

171-
static void pglogical_broadcast_table(StringInfo out, PGLogicalOutputData *data, MtmCopyRequest* copy)
171+
static void pglogical_broadcast_table(StringInfo out, LogicalDecodingContext *ctx, MtmCopyRequest* copy)
172172
{
173173
if (BIT_CHECK(copy->targetNodes, MtmReplicationNodeId-1)) {
174174
HeapScanDesc scandesc;
175175
HeapTuple tuple;
176176
Relation rel;
177177

178-
StartTransactionCommand();
179-
180178
rel = heap_open(copy->sourceTable, ShareLock);
181179

182-
pq_sendbyte(out, 'M');
183-
pq_sendbyte(out, 'B');
184-
pq_sendint(out, sizeof(*copy), 4);
185-
pq_sendbytes(out, (char*)copy, sizeof(*copy));
186-
187-
pglogical_write_rel(out, data, rel);
180+
pglogical_write_rel(out, ctx->output_plugin_private, rel);
181+
182+
pq_sendbyte(out, '0');
188183

189184
scandesc = heap_beginscan(rel, GetTransactionSnapshot(), 0, NULL);
190185
while ((tuple = heap_getnext(scandesc, ForwardScanDirection)) != NULL)
191186
{
192-
pglogical_write_tuple(out, data, rel, tuple);
187+
MtmOutputPluginPrepareWrite(ctx, false, false);
188+
pq_sendbyte(out, 'I'); /* action INSERT */
189+
pglogical_write_tuple(out, ctx->output_plugin_private, rel, tuple);
190+
MtmOutputPluginWrite(ctx, false, false);
193191
}
194192
heap_endscan(scandesc);
195193
heap_close(rel, ShareLock);
196-
197-
CommitTransactionCommand();
198194
}
199195
}
200196

201197
static void
202-
pglogical_write_message(StringInfo out, PGLogicalOutputData *data,
198+
pglogical_write_message(StringInfo out, LogicalDecodingContext *ctx,
203199
const char *prefix, Size sz, const char *message)
204200
{
205201
MtmLastRelId = InvalidOid;
@@ -231,7 +227,7 @@ pglogical_write_message(StringInfo out, PGLogicalOutputData *data,
231227
*/
232228
return;
233229
case 'B':
234-
pglogical_broadcast_table(out, data, (MtmCopyRequest*)message);
230+
pglogical_broadcast_table(out, ctx, (MtmCopyRequest*)message);
235231
return;
236232
}
237233
pq_sendbyte(out, 'M');

pglogical_proto.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedef void (*pglogical_write_begin_fn)(StringInfo out, struct PGLogicalOutputData *data,
2323
ReorderBufferTXN *txn);
24-
typedef void (*pglogical_write_message_fn)(StringInfo out, struct PGLogicalOutputData *data,
24+
typedef void (*pglogical_write_message_fn)(StringInfo out, LogicalDecodingContext *ctx,
2525
const char *prefix, Size sz, const char *message);
2626
typedef void (*pglogical_write_commit_fn)(StringInfo out, struct PGLogicalOutputData *data,
2727
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);

pglogical_receiver.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -552,9 +552,9 @@ pglogical_receiver_main(Datum main_arg)
552552
MtmSpillToFile(spill_file, buf.data, buf.used);
553553
ByteBufferReset(&buf);
554554
}
555-
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'B' || stmt[1] == 'C'))) {
555+
if (stmt[0] == 'Z' || (stmt[0] == 'M' && (stmt[1] == 'L' || stmt[1] == 'A' || stmt[1] == 'C'))) {
556556
MTM_LOG3("Process '%c' message from %d", stmt[1], nodeId);
557-
if (stmt[0] == 'M' && (stmt[1] == 'B' || stmt[1] == 'C')) { /* concurrent DDL should be executed by parallel workers */
557+
if (stmt[0] == 'M' && stmt[1] == 'C') { /* concurrent DDL should be executed by parallel workers */
558558
MtmExecute(stmt, msg_len);
559559
} else {
560560
MtmExecutor(stmt, msg_len); /* all other messages can be processed by receiver itself */

0 commit comments

Comments
 (0)