Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9d2d0c4

Browse files
committed
Use batch insert in multimaster apply
1 parent 3e4fed0 commit 9d2d0c4

File tree

1 file changed

+119
-52
lines changed

1 file changed

+119
-52
lines changed

contrib/mmts/pglogical_apply.c

Lines changed: 119 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -881,6 +881,15 @@ process_remote_commit(StringInfo in)
881881
MtmUpdateLsnMapping(MtmReplicationNodeId, end_lsn);
882882
}
883883

884+
static int
885+
pq_peekmsgbyte(StringInfo msg)
886+
{
887+
return (msg->cursor < msg->len) ? (unsigned char) msg->data[msg->cursor] : EOF;
888+
}
889+
890+
#define MAX_BUFFERED_TUPLES 1024
891+
#define MAX_BUFFERED_TUPLES_SIZE 0x10000
892+
884893
static void
885894
process_remote_insert(StringInfo s, Relation rel)
886895
{
@@ -891,84 +900,142 @@ process_remote_insert(StringInfo s, Relation rel)
891900
ResultRelInfo *relinfo;
892901
ScanKey *index_keys;
893902
int i;
903+
TupleDesc tupDesc = RelationGetDescr(rel);
904+
HeapTuple tup;
894905

895906
PushActiveSnapshot(GetTransactionSnapshot());
896907

897908
estate = create_rel_estate(rel);
898909
newslot = ExecInitExtraTupleSlot(estate);
899910
oldslot = ExecInitExtraTupleSlot(estate);
900-
ExecSetSlotDescriptor(newslot, RelationGetDescr(rel));
901-
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
911+
ExecSetSlotDescriptor(newslot, tupDesc);
912+
ExecSetSlotDescriptor(oldslot, tupDesc);
913+
914+
ExecOpenIndices(estate->es_result_relation_info, false);
915+
relinfo = estate->es_result_relation_info;
902916

903917
read_tuple_parts(s, rel, &new_tuple);
918+
919+
if (pq_peekmsgbyte(s) == 'I')
904920
{
905-
HeapTuple tup;
906-
tup = heap_form_tuple(RelationGetDescr(rel),
921+
/* Use bulk insert */
922+
BulkInsertState bistate = GetBulkInsertState();
923+
HeapTuple bufferedTuples[MAX_BUFFERED_TUPLES];
924+
MemoryContext oldcontext;
925+
int nBufferedTuples;
926+
size_t bufferedTuplesSize;
927+
CommandId mycid = GetCurrentCommandId(true);
928+
929+
bufferedTuples[0] = heap_form_tuple(tupDesc, new_tuple.values, new_tuple.isnull);
930+
bufferedTuplesSize = bufferedTuples[0]->t_len;
931+
932+
for (nBufferedTuples = 1; nBufferedTuples < MAX_BUFFERED_TUPLES && bufferedTuplesSize < MAX_BUFFERED_TUPLES_SIZE; nBufferedTuples++)
933+
{
934+
int action = pq_getmsgbyte(s);
935+
Assert(action == 'I');
936+
read_tuple_parts(s, rel, &new_tuple);
937+
bufferedTuples[nBufferedTuples] = heap_form_tuple(tupDesc,
938+
new_tuple.values, new_tuple.isnull);
939+
if (pq_peekmsgbyte(s) != 'I')
940+
break;
941+
}
942+
/*
943+
* heap_multi_insert leaks memory, so switch to short-lived memory context
944+
* before calling it.
945+
*/
946+
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
947+
heap_multi_insert(rel,
948+
bufferedTuples,
949+
nBufferedTuples,
950+
mycid,
951+
0,
952+
bistate);
953+
MemoryContextSwitchTo(oldcontext);
954+
955+
/*
956+
* If there are any indexes, update them for all the inserted tuples, and
957+
* run AFTER ROW INSERT triggers.
958+
*/
959+
if (relinfo->ri_NumIndices > 0)
960+
{
961+
for (i = 0; i < nBufferedTuples; i++)
962+
{
963+
List *recheckIndexes;
964+
965+
ExecStoreTuple(bufferedTuples[i], newslot, InvalidBuffer, false);
966+
recheckIndexes =
967+
ExecInsertIndexTuples(newslot, &bufferedTuples[i]->t_self,
968+
estate, false, NULL, NIL);
969+
list_free(recheckIndexes);
970+
}
971+
}
972+
973+
FreeBulkInsertState(bistate);
974+
} else {
975+
/* Insert single tuple */
976+
977+
tup = heap_form_tuple(tupDesc,
907978
new_tuple.values, new_tuple.isnull);
908979
ExecStoreTuple(tup, newslot, InvalidBuffer, true);
909-
}
910980

911-
// if (rel->rd_rel->relkind != RELKIND_RELATION) // RELKIND_MATVIEW
912-
// MTM_ELOG(ERROR, "unexpected relkind '%c' rel \"%s\"",
913-
// rel->rd_rel->relkind, RelationGetRelationName(rel));
981+
// if (rel->rd_rel->relkind != RELKIND_RELATION) // RELKIND_MATVIEW
982+
// MTM_ELOG(ERROR, "unexpected relkind '%c' rel \"%s\"",
983+
// rel->rd_rel->relkind, RelationGetRelationName(rel));
914984

915-
/* debug output */
985+
/* debug output */
916986
#ifdef VERBOSE_INSERT
917-
log_tuple("INSERT:%s", RelationGetDescr(rel), newslot->tts_tuple);
987+
log_tuple("INSERT:%s", tupDesc, newslot->tts_tuple);
918988
#endif
919989

920-
/*
921-
* Search for conflicting tuples.
922-
*/
923-
ExecOpenIndices(estate->es_result_relation_info, false);
924-
relinfo = estate->es_result_relation_info;
925-
index_keys = palloc0(relinfo->ri_NumIndices * sizeof(ScanKeyData*));
926-
927-
build_index_scan_keys(estate, index_keys, &new_tuple);
928-
929-
/* do a SnapshotDirty search for conflicting tuples */
930-
for (i = 0; i < relinfo->ri_NumIndices; i++)
931-
{
932-
IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
933-
bool found = false;
934-
935990
/*
936-
* Only unique indexes are of interest here, and we can't deal with
937-
* expression indexes so far. FIXME: predicates should be handled
938-
* better.
939-
*
940-
* NB: Needs to match expression in build_index_scan_key
991+
* Search for conflicting tuples.
941992
*/
942-
if (!ii->ii_Unique || ii->ii_Expressions != NIL)
943-
continue;
993+
index_keys = palloc0(relinfo->ri_NumIndices * sizeof(ScanKeyData*));
944994

945-
if (index_keys[i] == NULL)
946-
continue;
995+
build_index_scan_keys(estate, index_keys, &new_tuple);
996+
997+
/* do a SnapshotDirty search for conflicting tuples */
998+
for (i = 0; i < relinfo->ri_NumIndices; i++)
999+
{
1000+
IndexInfo *ii = relinfo->ri_IndexRelationInfo[i];
1001+
bool found = false;
9471002

948-
Assert(ii->ii_Expressions == NIL);
1003+
/*
1004+
* Only unique indexes are of interest here, and we can't deal with
1005+
* expression indexes so far. FIXME: predicates should be handled
1006+
* better.
1007+
*
1008+
* NB: Needs to match expression in build_index_scan_key
1009+
*/
1010+
if (!ii->ii_Unique || ii->ii_Expressions != NIL)
1011+
continue;
9491012

950-
/* if conflict: wait */
951-
found = find_pkey_tuple(index_keys[i],
952-
rel, relinfo->ri_IndexRelationDescs[i],
953-
oldslot, true, LockTupleExclusive);
1013+
if (index_keys[i] == NULL)
1014+
continue;
9541015

955-
/* alert if there's more than one conflicting unique key */
956-
if (found)
957-
{
958-
/* TODO: Report tuple identity in log */
959-
ereport(ERROR,
960-
(errcode(ERRCODE_UNIQUE_VIOLATION),
961-
MTM_ERRMSG("Unique constraints violated by remotely INSERTed tuple in %s", RelationGetRelationName(rel)),
962-
errdetail("Cannot apply transaction because remotely INSERTed tuple conflicts with a local tuple on UNIQUE constraint and/or PRIMARY KEY")));
1016+
Assert(ii->ii_Expressions == NIL);
1017+
1018+
/* if conflict: wait */
1019+
found = find_pkey_tuple(index_keys[i],
1020+
rel, relinfo->ri_IndexRelationDescs[i],
1021+
oldslot, true, LockTupleExclusive);
1022+
/* alert if there's more than one conflicting unique key */
1023+
if (found)
1024+
{
1025+
/* TODO: Report tuple identity in log */
1026+
ereport(ERROR,
1027+
(errcode(ERRCODE_UNIQUE_VIOLATION),
1028+
MTM_ERRMSG("Unique constraints violated by remotely INSERTed tuple in %s", RelationGetRelationName(rel)),
1029+
errdetail("Cannot apply transaction because remotely INSERTed tuple conflicts with a local tuple on UNIQUE constraint and/or PRIMARY KEY")));
1030+
}
1031+
CHECK_FOR_INTERRUPTS();
9631032
}
964-
CHECK_FOR_INTERRUPTS();
965-
}
9661033

967-
simple_heap_insert(rel, newslot->tts_tuple);
968-
UserTableUpdateOpenIndexes(estate, newslot);
1034+
simple_heap_insert(rel, newslot->tts_tuple);
1035+
UserTableUpdateOpenIndexes(estate, newslot);
9691036

1037+
}
9701038
ExecCloseIndices(estate->es_result_relation_info);
971-
9721039
if (ActiveSnapshotSet())
9731040
PopActiveSnapshot();
9741041

0 commit comments

Comments
 (0)