Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0621b7b

Browse files
knizhnikkelvich
authored andcommitted
Spill applied transactions to the disk
1 parent b48c191 commit 0621b7b

File tree

7 files changed

+217
-72
lines changed

7 files changed

+217
-72
lines changed

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o
33

44
override CPPFLAGS += -I../raftable
55

@@ -15,7 +15,7 @@ all: multimaster.so
1515
tests/dtmbench:
1616
make -C tests
1717

18-
PG_CPPFLAGS = -I$(libpq_srcdir) -DUSE_PGLOGICAL_OUTPUT
18+
PG_CPPFLAGS = -I$(libpq_srcdir)
1919
SHLIB_LINK = $(libpq)
2020

2121
ifdef USE_PGXS

multimaster.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ int MtmConnectTimeout;
184184
int MtmKeepaliveTimeout;
185185
int MtmReconnectAttempts;
186186
int MtmNodeDisableDelay;
187+
int MtmTransSpillThreshold;
187188
bool MtmUseRaftable;
188189
bool MtmUseDtm;
189190
MtmConnectionInfo* MtmConnections;
@@ -1247,6 +1248,7 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
12471248
matrix[i] |= ((matrix[j] >> i) & 1) << j;
12481249
matrix[j] |= ((matrix[i] >> j) & 1) << i;
12491250
}
1251+
matrix[i] &= ~((nodemask_t)1 << i);
12501252
}
12511253
return true;
12521254
}
@@ -1630,6 +1632,21 @@ _PG_init(void)
16301632
if (!process_shared_preload_libraries_in_progress)
16311633
return;
16321634

1635+
DefineCustomIntVariable(
1636+
"multimaster.trans_spill_threshold",
1637+
"Maximal size (Mb) of transaction after which transaction is written to the disk",
1638+
NULL,
1639+
&MtmTransSpillThreshold,
1640+
1000, /* 1Gb */
1641+
0,
1642+
INT_MAX,
1643+
PGC_BACKEND,
1644+
0,
1645+
NULL,
1646+
NULL,
1647+
NULL
1648+
);
1649+
16331650
DefineCustomIntVariable(
16341651
"multimaster.twopc_min_timeout",
16351652
"Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes",

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ extern int MtmConnectTimeout;
205205
extern int MtmReconnectAttempts;
206206
extern int MtmKeepaliveTimeout;
207207
extern int MtmNodeDisableDelay;
208+
extern int MtmTransSpillThreshold;
208209
extern bool MtmUseDtm;
209210
extern HTAB* MtmXid2State;
210211

pglogical_apply.c

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
#include "multimaster.h"
5252
#include "pglogical_relid_map.h"
53+
#include "spill.h"
5354

5455
typedef struct TupleData
5556
{
@@ -901,6 +902,9 @@ void MtmExecutor(int id, void* work, size_t size)
901902
{
902903
StringInfoData s;
903904
Relation rel = NULL;
905+
int spill_file = -1;
906+
int save_cursor;
907+
int save_len;
904908
s.data = work;
905909
s.len = size;
906910
s.maxlen = -1;
@@ -944,6 +948,33 @@ void MtmExecutor(int id, void* work, size_t size)
944948
case 'R':
945949
rel = read_rel(&s, RowExclusiveLock);
946950
continue;
951+
case 'F':
952+
{
953+
int node_id = pq_getmsgint(&s, 4);
954+
int file_id = pq_getmsgint(&s, 4);
955+
Assert(spill_file < 0);
956+
spill_file = MtmOpenSpillFile(node_id, file_id);
957+
continue;
958+
}
959+
case '(':
960+
{
961+
int64 size = pq_getmsgint(&s, 4);
962+
s.data = palloc(size);
963+
save_cursor = s.cursor;
964+
save_len = s.len;
965+
s.cursor = 0;
966+
s.len = size;
967+
MtmReadSpillFile(spill_file, s.data, size);
968+
continue;
969+
}
970+
case ')':
971+
{
972+
pfree(s.data);
973+
s.data = work;
974+
s.cursor = save_cursor;
975+
s.len = save_len;
976+
continue;
977+
}
947978
default:
948979
elog(ERROR, "unknown action of type %c", action);
949980
}
@@ -963,7 +994,9 @@ void MtmExecutor(int id, void* work, size_t size)
963994
MTM_LOG2("%d: REMOTE end abort transaction %d", MyProcPid, MtmGetCurrentTransactionId());
964995
}
965996
PG_END_TRY();
966-
997+
if (spill_file >= 0) {
998+
MtmCloseSpillFile(spill_file);
999+
}
9671000
MemoryContextResetAndDeleteChildren(ApplyContext);
9681001
}
9691002

pglogical_receiver.c

Lines changed: 38 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "access/clog.h"
2525
#include "access/transam.h"
2626
#include "lib/stringinfo.h"
27+
#include "libpq/pqformat.h"
2728
#include "pgstat.h"
2829
#include "postmaster/bgworker.h"
2930
#include "storage/ipc.h"
@@ -35,6 +36,7 @@
3536
#include "replication/origin.h"
3637

3738
#include "multimaster.h"
39+
#include "spill.h"
3840

3941
/* Allow load of this module in shared libs */
4042

@@ -213,20 +215,23 @@ pglogical_receiver_main(Datum main_arg)
213215
PGresult *res;
214216
MtmSlotMode mode;
215217

216-
#ifndef USE_PGLOGICAL_OUTPUT
217-
bool insideTrans = false;
218-
#endif
219218
ByteBuffer buf;
220219
XLogRecPtr originStartPos = 0;
221220
RepOriginId originId;
222221
char* originName;
223222
/* Buffer for COPY data */
224223
char *copybuf = NULL;
224+
int spill_file = -1;
225+
StringInfoData spill_info;
226+
227+
initStringInfo(&spill_info);
225228

226229
/* Register functions for SIGTERM/SIGHUP management */
227230
pqsignal(SIGHUP, receiver_raw_sighup);
228231
pqsignal(SIGTERM, receiver_raw_sigterm);
229232

233+
MtmCreateSpillDirectory(args->remote_node);
234+
230235
sprintf(worker_proc, "mtm_pglogical_receiver_%d_%d", args->local_node, args->remote_node);
231236

232237
/* We're now ready to receive signals */
@@ -449,34 +454,38 @@ pglogical_receiver_main(Datum main_arg)
449454
if (rc > hdr_len)
450455
{
451456
stmt = copybuf + hdr_len;
452-
453-
#ifdef USE_PGLOGICAL_OUTPUT
457+
458+
if (buf.used >= MtmTransSpillThreshold) {
459+
if (spill_file < 0) {
460+
int file_id;
461+
spill_file = MtmCreateSpillFile(args->remote_node, &file_id);
462+
pq_sendbyte(&spill_info, 'F');
463+
pq_sendint(&spill_info, args->remote_node, 4);
464+
pq_sendint(&spill_info, file_id, 4);
465+
}
466+
ByteBufferAppend(&buf, ")", 1);
467+
pq_sendbyte(&spill_info, '(');
468+
pq_sendint(&spill_info, buf.used, 4);
469+
MtmSpillToFile(spill_file, buf.data, buf.used);
470+
ByteBufferReset(&buf);
471+
}
454472
ByteBufferAppend(&buf, stmt, rc - hdr_len);
455473
if (stmt[0] == 'C') /* commit */
456-
{
457-
MtmExecute(buf.data, buf.used);
474+
{
475+
if (spill_file >= 0) {
476+
ByteBufferAppend(&buf, ")", 1);
477+
pq_sendbyte(&spill_info, '(');
478+
pq_sendint(&spill_info, buf.used, 4);
479+
MtmSpillToFile(spill_file, buf.data, buf.used);
480+
MtmCloseSpillFile(spill_file);
481+
MtmExecute(spill_info.data, spill_info.len);
482+
spill_file = -1;
483+
resetStringInfo(&spill_info);
484+
} else {
485+
MtmExecute(buf.data, buf.used);
486+
}
458487
ByteBufferReset(&buf);
459488
}
460-
#else
461-
if (strncmp(stmt, "BEGIN ", 6) == 0) {
462-
TransactionId xid;
463-
int rc = sscanf(stmt + 6, "%u", &xid);
464-
Assert(rc == 1);
465-
ByteBufferAppendInt32(&buf, xid);
466-
Assert(!insideTrans);
467-
insideTrans = true;
468-
} else if (strncmp(stmt, "COMMIT;", 7) == 0) {
469-
Assert(insideTrans);
470-
Assert(buf.used > 4);
471-
buf.data[buf.used-1] = '\0'; /* replace last ';' with '\0' to make string zero terminated */
472-
MMExecute(buf.data, buf.used);
473-
ByteBufferReset(&buf);
474-
insideTrans = false;
475-
} else {
476-
Assert(insideTrans);
477-
ByteBufferAppend(&buf, stmt, rc - hdr_len/*strlen(stmt)*/);
478-
}
479-
#endif
480489
}
481490
/* Update written position */
482491
output_written_lsn = Max(walEnd, output_written_lsn);
@@ -575,6 +584,7 @@ void MtmStartReceivers(void)
575584
{
576585
int i;
577586
BackgroundWorker worker;
587+
578588
MemSet(&worker, 0, sizeof(BackgroundWorker));
579589
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
580590
worker.bgw_start_time = BgWorkerStart_ConsistentState;
@@ -586,6 +596,7 @@ void MtmStartReceivers(void)
586596
ReceiverArgs* ctx = (ReceiverArgs*)palloc(sizeof(ReceiverArgs));
587597
ctx->receiver_conn_string = psprintf("replication=database %s", MtmConnections[i].connStr);
588598
sprintf(ctx->receiver_slot, MULTIMASTER_SLOT_PATTERN, MtmNodeId);
599+
589600
ctx->local_node = MtmNodeId;
590601
ctx->remote_node = i+1;
591602

@@ -598,45 +609,3 @@ void MtmStartReceivers(void)
598609
}
599610
}
600611

601-
#ifndef USE_PGLOGICAL_OUTPUT
602-
void MMExecutor(int id, void* work, size_t size)
603-
{
604-
TransactionId xid = *(TransactionId*)work;
605-
char* stmts = (char*)work + 4;
606-
bool finished = false;
607-
608-
MMJoinTransaction(xid);
609-
610-
SetCurrentStatementStartTimestamp();
611-
StartTransactionCommand();
612-
SPI_connect();
613-
PushActiveSnapshot(GetTransactionSnapshot());
614-
615-
PG_TRY();
616-
{
617-
int rc = SPI_execute(stmts, false, 0);
618-
SPI_finish();
619-
PopActiveSnapshot();
620-
finished = true;
621-
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE) {
622-
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
623-
id, xid)));
624-
AbortCurrentTransaction();
625-
} else {
626-
CommitTransactionCommand();
627-
}
628-
}
629-
PG_CATCH();
630-
{
631-
FlushErrorState();
632-
if (!finished) {
633-
SPI_finish();
634-
if (ActiveSnapshotSet()) {
635-
PopActiveSnapshot();
636-
}
637-
}
638-
AbortCurrentTransaction();
639-
}
640-
PG_END_TRY();
641-
}
642-
#endif

0 commit comments

Comments
 (0)