Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit d7e9e02

Browse files
committed
Support monotonic sequences
1 parent 905ce98 commit d7e9e02

File tree

6 files changed

+161
-0
lines changed

6 files changed

+161
-0
lines changed

contrib/mmts/multimaster.c

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "utils/memutils.h"
4848
#include "commands/dbcommands.h"
4949
#include "commands/extension.h"
50+
#include "commands/sequence.h"
5051
#include "postmaster/autovacuum.h"
5152
#include "storage/pmsignal.h"
5253
#include "storage/proc.h"
@@ -266,11 +267,13 @@ static bool MtmBreakConnection;
266267
static bool MtmClusterLocked;
267268
static bool MtmInsideTransaction;
268269
static bool MtmReferee;
270+
static bool MtmMonotonicSequences;
269271

270272
static ExecutorStart_hook_type PreviousExecutorStartHook;
271273
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
272274
static ProcessUtility_hook_type PreviousProcessUtilityHook;
273275
static shmem_startup_hook_type PreviousShmemStartupHook;
276+
static seq_nextval_hook_t PreviousSeqNextvalHook;
274277

275278
static nodemask_t lastKnownMatrix[MAX_NODES];
276279

@@ -279,6 +282,7 @@ static void MtmExecutorFinish(QueryDesc *queryDesc);
279282
static void MtmProcessUtility(Node *parsetree, const char *queryString,
280283
ProcessUtilityContext context, ParamListInfo params,
281284
DestReceiver *dest, char *completionTag);
285+
static void MtmSeqNextvalHook(Oid seqid, int64 next);
282286

283287
static bool MtmAtExitHookRegistered = false;
284288

@@ -3136,6 +3140,19 @@ _PG_init(void)
31363140
NULL
31373141
);
31383142

3143+
DefineCustomBoolVariable(
3144+
"multimaster.monotonic_sequences",
3145+
"Enforce monotinic behaviour of sequence values obtained from different nodes",
3146+
NULL,
3147+
&MtmMonotonicSequences,
3148+
false,
3149+
PGC_BACKEND,
3150+
0,
3151+
NULL,
3152+
NULL,
3153+
NULL
3154+
);
3155+
31393156
DefineCustomBoolVariable(
31403157
"multimaster.ignore_tables_without_pk",
31413158
"Do not replicate tables without primary key",
@@ -3390,6 +3407,9 @@ _PG_init(void)
33903407

33913408
PreviousProcessUtilityHook = ProcessUtility_hook;
33923409
ProcessUtility_hook = MtmProcessUtility;
3410+
3411+
PreviousSeqNextvalHook = SeqNextvalHook;
3412+
SeqNextvalHook = MtmSeqNextvalHook;
33933413
}
33943414

33953415
/*
@@ -3401,6 +3421,7 @@ _PG_fini(void)
34013421
shmem_startup_hook = PreviousShmemStartupHook;
34023422
ExecutorFinish_hook = PreviousExecutorFinishHook;
34033423
ProcessUtility_hook = PreviousProcessUtilityHook;
3424+
SeqNextvalHook = PreviousSeqNextvalHook;
34043425
}
34053426

34063427

@@ -5338,6 +5359,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
53385359
}
53395360
}
53405361

5362+
static void MtmSeqNextvalHook(Oid seqid, int64 next)
5363+
{
5364+
if (MtmMonotonicSequences)
5365+
{
5366+
MtmSeqPosition pos;
5367+
pos.seqid = seqid;
5368+
pos.next = next;
5369+
LogLogicalMessage("N", (char*)&pos, sizeof(pos), true);
5370+
}
5371+
}
5372+
53415373
/*
53425374
* -------------------------------------------
53435375
* Executor pool interface

contrib/mmts/multimaster.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,12 @@ typedef struct MtmFlushPosition
337337
} MtmFlushPosition;
338338

339339

340+
typedef struct MtmSeqPosition
341+
{
342+
Oid seqid;
343+
int64 next;
344+
} MtmSeqPosition;
345+
340346
#define MtmIsCoordinator(ts) (ts->gtid.node == MtmNodeId)
341347

342348
extern char const* const MtmNodeStatusMnem[];

contrib/mmts/pglogical_apply.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "commands/vacuum.h"
2323
#include "commands/tablespace.h"
2424
#include "commands/defrem.h"
25+
#include "commands/sequence.h"
2526
#include "parser/parse_utilcmd.h"
2627

2728
#include "libpq/pqformat.h"
@@ -1157,7 +1158,18 @@ void MtmExecutor(void* work, size_t size)
11571158
s.len = save_len;
11581159
break;
11591160
}
1161+
case 'N':
1162+
{
1163+
int64 next;
1164+
Assert(rel != NULL);
1165+
next = pq_getmsgint64(&s);
1166+
AdjustSequence(RelationGetRelid(rel), next);
1167+
close_rel(rel);
1168+
rel = NULL;
1169+
break;
1170+
}
11601171
case '0':
1172+
Assert(rel != NULL);
11611173
heap_truncate_one_rel(rel);
11621174
break;
11631175
case 'M':

contrib/mmts/pglogical_proto.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,17 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
168168
}
169169
}
170170

171+
172+
static void pglogical_seq_nextval(StringInfo out, LogicalDecodingContext *ctx, MtmSeqPosition* pos)
173+
{
174+
Relation rel = heap_open(pos->seqid, NoLock);
175+
pglogical_write_rel(out, ctx->output_plugin_private, rel);
176+
heap_close(rel, NoLock);
177+
pq_sendbyte(out, 'N');
178+
pq_sendint64(out, pos->next);
179+
}
180+
181+
171182
static void pglogical_broadcast_table(StringInfo out, LogicalDecodingContext *ctx, MtmCopyRequest* copy)
172183
{
173184
if (BIT_CHECK(copy->targetNodes, MtmReplicationNodeId-1)) {
@@ -229,6 +240,9 @@ pglogical_write_message(StringInfo out, LogicalDecodingContext *ctx,
229240
case 'B':
230241
pglogical_broadcast_table(out, ctx, (MtmCopyRequest*)message);
231242
return;
243+
case 'N':
244+
pglogical_seq_nextval(out, ctx, (MtmSeqPosition*)message);
245+
return;
232246
}
233247
pq_sendbyte(out, 'M');
234248
pq_sendbyte(out, *prefix);

src/backend/commands/sequence.c

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ typedef struct SeqTableData
8080

8181
typedef SeqTableData *SeqTable;
8282

83+
seq_nextval_hook_t SeqNextvalHook;
84+
8385
static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
8486

8587
/*
@@ -585,6 +587,10 @@ nextval_internal(Oid relid)
585587
elm->last += elm->increment;
586588
relation_close(seqrel, NoLock);
587589
last_used_seq = elm;
590+
591+
if (SeqNextvalHook)
592+
SeqNextvalHook(relid, elm->last);
593+
588594
return elm->last;
589595
}
590596

@@ -771,6 +777,9 @@ nextval_internal(Oid relid)
771777

772778
relation_close(seqrel, NoLock);
773779

780+
if (SeqNextvalHook)
781+
SeqNextvalHook(relid, result);
782+
774783
return result;
775784
}
776785

@@ -840,6 +849,87 @@ lastval(PG_FUNCTION_ARGS)
840849
PG_RETURN_INT64(result);
841850
}
842851

852+
void AdjustSequence(Oid relid, int64 next)
853+
{
854+
SeqTable elm;
855+
Relation seqrel;
856+
Buffer buf;
857+
int64 last;
858+
HeapTupleData seqtuple;
859+
Form_pg_sequence seq;
860+
861+
/* open and AccessShareLock sequence */
862+
init_sequence(relid, &elm, &seqrel);
863+
864+
/* lock page' buffer and read tuple */
865+
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
866+
867+
if (elm->last != elm->cached && elm->last + elm->increment > next) /* cached number is greater than received */
868+
{
869+
relation_close(seqrel, NoLock);
870+
return;
871+
}
872+
873+
/* lock page' buffer and read tuple */
874+
seq = read_seq_tuple(elm, seqrel, &buf, &seqtuple);
875+
876+
last = seq->last_value;
877+
if (seq->is_called)
878+
{
879+
last += elm->increment;
880+
}
881+
if (last <= next)
882+
{
883+
Assert(next >= seq->min_value && next <= seq->max_value);
884+
885+
/* Set the currval() state only if iscalled = true */
886+
if (seq->is_called)
887+
{
888+
elm->last = next; /* last returned number */
889+
elm->last_valid = true;
890+
}
891+
892+
/* In any case, forget any future cached numbers */
893+
elm->cached = elm->last;
894+
895+
/* check the comment above nextval_internal()'s equivalent call. */
896+
if (RelationNeedsWAL(seqrel))
897+
GetTopTransactionId();
898+
899+
START_CRIT_SECTION();
900+
901+
seq->last_value = next; /* last fetched number */
902+
seq->log_cnt = 0;
903+
904+
MarkBufferDirty(buf);
905+
906+
/* XLOG stuff */
907+
if (RelationNeedsWAL(seqrel))
908+
{
909+
xl_seq_rec xlrec;
910+
XLogRecPtr recptr;
911+
Page page = BufferGetPage(buf);
912+
913+
XLogBeginInsert();
914+
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
915+
916+
xlrec.node = seqrel->rd_node;
917+
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
918+
XLogRegisterData((char *) seqtuple.t_data, seqtuple.t_len);
919+
920+
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
921+
922+
PageSetLSN(page, recptr);
923+
}
924+
END_CRIT_SECTION();
925+
}
926+
927+
UnlockReleaseBuffer(buf);
928+
929+
relation_close(seqrel, NoLock);
930+
}
931+
932+
843933
/*
844934
* Main internal procedure that handles 2 & 3 arg forms of SETVAL.
845935
*

src/include/commands/sequence.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ extern Datum setval_oid(PG_FUNCTION_ARGS);
7171
extern Datum setval3_oid(PG_FUNCTION_ARGS);
7272
extern Datum lastval(PG_FUNCTION_ARGS);
7373

74+
extern void AdjustSequence(Oid seqid, int64 next);
75+
7476
extern Datum pg_sequence_parameters(PG_FUNCTION_ARGS);
7577

7678
extern ObjectAddress DefineSequence(CreateSeqStmt *stmt);
@@ -82,4 +84,9 @@ extern void seq_redo(XLogReaderState *rptr);
8284
extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
8385
extern const char *seq_identify(uint8 info);
8486

87+
typedef void (*seq_nextval_hook_t)(Oid seq_relid, int64 next);
88+
89+
extern seq_nextval_hook_t SeqNextvalHook;
90+
91+
8592
#endif /* SEQUENCE_H */

0 commit comments

Comments
 (0)