Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 76741a1

Browse files
knizhnikkelvich
authored andcommitted
Support monotonic sequences
1 parent 181d689 commit 76741a1

File tree

4 files changed

+64
-0
lines changed

4 files changed

+64
-0
lines changed

multimaster.c

+32
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "utils/memutils.h"
4848
#include "commands/dbcommands.h"
4949
#include "commands/extension.h"
50+
#include "commands/sequence.h"
5051
#include "postmaster/autovacuum.h"
5152
#include "storage/pmsignal.h"
5253
#include "storage/proc.h"
@@ -267,11 +268,13 @@ static bool MtmBreakConnection;
267268
static bool MtmClusterLocked;
268269
static bool MtmInsideTransaction;
269270
static bool MtmReferee;
271+
static bool MtmMonotonicSequences;
270272

271273
static ExecutorStart_hook_type PreviousExecutorStartHook;
272274
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
273275
static ProcessUtility_hook_type PreviousProcessUtilityHook;
274276
static shmem_startup_hook_type PreviousShmemStartupHook;
277+
static seq_nextval_hook_t PreviousSeqNextvalHook;
275278

276279
static nodemask_t lastKnownMatrix[MAX_NODES];
277280

@@ -280,6 +283,7 @@ static void MtmExecutorFinish(QueryDesc *queryDesc);
280283
static void MtmProcessUtility(Node *parsetree, const char *queryString,
281284
ProcessUtilityContext context, ParamListInfo params,
282285
DestReceiver *dest, char *completionTag);
286+
static void MtmSeqNextvalHook(Oid seqid, int64 next);
283287

284288
static bool MtmAtExitHookRegistered = false;
285289

@@ -3137,6 +3141,19 @@ _PG_init(void)
31373141
NULL
31383142
);
31393143

3144+
DefineCustomBoolVariable(
3145+
"multimaster.monotonic_sequences",
3146+
"Enforce monotinic behaviour of sequence values obtained from different nodes",
3147+
NULL,
3148+
&MtmMonotonicSequences,
3149+
false,
3150+
PGC_BACKEND,
3151+
0,
3152+
NULL,
3153+
NULL,
3154+
NULL
3155+
);
3156+
31403157
DefineCustomBoolVariable(
31413158
"multimaster.ignore_tables_without_pk",
31423159
"Do not replicate tables without primary key",
@@ -3404,6 +3421,9 @@ _PG_init(void)
34043421

34053422
PreviousProcessUtilityHook = ProcessUtility_hook;
34063423
ProcessUtility_hook = MtmProcessUtility;
3424+
3425+
PreviousSeqNextvalHook = SeqNextvalHook;
3426+
SeqNextvalHook = MtmSeqNextvalHook;
34073427
}
34083428

34093429
/*
@@ -3415,6 +3435,7 @@ _PG_fini(void)
34153435
shmem_startup_hook = PreviousShmemStartupHook;
34163436
ExecutorFinish_hook = PreviousExecutorFinishHook;
34173437
ProcessUtility_hook = PreviousProcessUtilityHook;
3438+
SeqNextvalHook = PreviousSeqNextvalHook;
34183439
}
34193440

34203441

@@ -5352,6 +5373,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
53525373
}
53535374
}
53545375

5376+
static void MtmSeqNextvalHook(Oid seqid, int64 next)
5377+
{
5378+
if (MtmMonotonicSequences)
5379+
{
5380+
MtmSeqPosition pos;
5381+
pos.seqid = seqid;
5382+
pos.next = next;
5383+
LogLogicalMessage("N", (char*)&pos, sizeof(pos), true);
5384+
}
5385+
}
5386+
53555387
/*
53565388
* -------------------------------------------
53575389
* Executor pool interface

multimaster.h

+6
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,12 @@ typedef struct MtmFlushPosition
337337
} MtmFlushPosition;
338338

339339

340+
typedef struct MtmSeqPosition
341+
{
342+
Oid seqid;
343+
int64 next;
344+
} MtmSeqPosition;
345+
340346
#define MtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
341347

342348
extern char const* const MtmNodeStatusMnem[];

pglogical_apply.c

+12
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "commands/vacuum.h"
2323
#include "commands/tablespace.h"
2424
#include "commands/defrem.h"
25+
#include "commands/sequence.h"
2526
#include "parser/parse_utilcmd.h"
2627

2728
#include "libpq/pqformat.h"
@@ -1157,7 +1158,18 @@ void MtmExecutor(void* work, size_t size)
11571158
s.len = save_len;
11581159
break;
11591160
}
1161+
case 'N':
1162+
{
1163+
int64 next;
1164+
Assert(rel != NULL);
1165+
next = pq_getmsgint64(&s);
1166+
AdjustSequence(RelationGetRelid(rel), next);
1167+
close_rel(rel);
1168+
rel = NULL;
1169+
break;
1170+
}
11601171
case '0':
1172+
Assert(rel != NULL);
11611173
heap_truncate_one_rel(rel);
11621174
break;
11631175
case 'M':

pglogical_proto.c

+14
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,17 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168168
}
169169
}
170170

171+
172+
static void pglogical_seq_nextval(StringInfo out, LogicalDecodingContext *ctx, MtmSeqPosition* pos)
173+
{
174+
Relation rel = heap_open(pos->seqid, NoLock);
175+
pglogical_write_rel(out, ctx->output_plugin_private, rel);
176+
heap_close(rel, NoLock);
177+
pq_sendbyte(out, 'N');
178+
pq_sendint64(out, pos->next);
179+
}
180+
181+
171182
static void pglogical_broadcast_table(StringInfo out, LogicalDecodingContext *ctx, MtmCopyRequest* copy)
172183
{
173184
if (BIT_CHECK(copy->targetNodes, MtmReplicationNodeId-1)) {
@@ -229,6 +240,9 @@ pglogical_write_message(StringInfo out, LogicalDecodingContext *ctx,
229240
case 'B':
230241
pglogical_broadcast_table(out, ctx, (MtmCopyRequest*)message);
231242
return;
243+
case 'N':
244+
pglogical_seq_nextval(out, ctx, (MtmSeqPosition*)message);
245+
return;
232246
}
233247
pq_sendbyte(out, 'M');
234248
pq_sendbyte(out, *prefix);

0 commit comments

Comments
 (0)