Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8d1f69d

Browse files
committed
Merge branch 'PGPROEE10_MULTIMASTER' of https://gitlab.postgrespro.ru/pgpro-dev/postgrespro into PGPROEE10_MULTIMASTER
2 parents 765d063 + e43e651 commit 8d1f69d

File tree

5 files changed

+199
-16
lines changed

5 files changed

+199
-16
lines changed

contrib/mmts/multimaster.c

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ static ExecutorStart_hook_type PreviousExecutorStartHook;
290290
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
291291
static ProcessUtility_hook_type PreviousProcessUtilityHook;
292292
static shmem_startup_hook_type PreviousShmemStartupHook;
293-
// static seq_nextval_hook_t PreviousSeqNextvalHook;
293+
static seq_nextval_hook_t PreviousSeqNextvalHook;
294294

295295
static void MtmExecutorStart(QueryDesc *queryDesc, int eflags);
296296
static void MtmExecutorFinish(QueryDesc *queryDesc);
@@ -3390,8 +3390,8 @@ _PG_init(void)
33903390
PreviousProcessUtilityHook = ProcessUtility_hook;
33913391
ProcessUtility_hook = MtmProcessUtility;
33923392

3393-
// PreviousSeqNextvalHook = SeqNextvalHook;
3394-
// SeqNextvalHook = MtmSeqNextvalHook;
3393+
PreviousSeqNextvalHook = SeqNextvalHook;
3394+
SeqNextvalHook = MtmSeqNextvalHook;
33953395
}
33963396

33973397
/*
@@ -3403,7 +3403,7 @@ _PG_fini(void)
34033403
shmem_startup_hook = PreviousShmemStartupHook;
34043404
ExecutorFinish_hook = PreviousExecutorFinishHook;
34053405
ProcessUtility_hook = PreviousProcessUtilityHook;
3406-
// SeqNextvalHook = PreviousSeqNextvalHook;
3406+
SeqNextvalHook = PreviousSeqNextvalHook;
34073407
}
34083408

34093409

@@ -5089,7 +5089,33 @@ static bool MtmFunctionProfileDependsOnTempTable(CreateFunctionStmt* func)
50895089
return false;
50905090
}
50915091

5092+
static void
5093+
AdjustCreateSequence(List *options)
5094+
{
5095+
bool has_increment = false, has_start = false;
5096+
ListCell *option;
5097+
5098+
foreach(option, options)
5099+
{
5100+
DefElem *defel = (DefElem *) lfirst(option);
5101+
if (strcmp(defel->defname, "increment") == 0)
5102+
has_increment = true;
5103+
else if (strcmp(defel->defname, "start") == 0)
5104+
has_start = true;
5105+
}
50925106

5107+
if (!has_increment)
5108+
{
5109+
DefElem *defel = makeDefElem("increment", (Node *) makeInteger(MtmMaxNodes), -1);
5110+
options = lappend(options, defel);
5111+
}
5112+
5113+
if (!has_start)
5114+
{
5115+
DefElem *defel = makeDefElem("start", (Node *) makeInteger(MtmNodeId), -1);
5116+
options = lappend(options, defel);
5117+
}
5118+
}
50935119

50945120
static void MtmProcessUtility(PlannedStmt *pstmt,
50955121
const char *queryString, ProcessUtilityContext context,
@@ -5162,6 +5188,13 @@ static void MtmProcessUtility(PlannedStmt *pstmt,
51625188
elog(ERROR, "Multimaster doesn't support creating and dropping databases");
51635189
break;
51645190

5191+
case T_CreateSeqStmt:
5192+
{
5193+
CreateSeqStmt *stmt = (CreateSeqStmt *) parsetree;
5194+
AdjustCreateSequence(stmt->options);
5195+
}
5196+
break;
5197+
51655198
case T_CreateTableSpaceStmt:
51665199
case T_DropTableSpaceStmt:
51675200
{

contrib/mmts/pglogical_apply.c

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,18 +1274,18 @@ void MtmExecutor(void* work, size_t size)
12741274
s.len = save_len;
12751275
break;
12761276
}
1277-
// case 'N':
1278-
// {
1279-
// int64 next;
1280-
// Oid relid;
1281-
// Assert(rel != NULL);
1282-
// relid = RelationGetRelid(rel);
1283-
// close_rel(rel);
1284-
// rel = NULL;
1285-
// next = pq_getmsgint64(&s);
1286-
// AdjustSequence(relid, next);
1287-
// break;
1288-
// }
1277+
case 'N':
1278+
{
1279+
int64 next;
1280+
Oid relid;
1281+
Assert(rel != NULL);
1282+
relid = RelationGetRelid(rel);
1283+
close_rel(rel);
1284+
rel = NULL;
1285+
next = pq_getmsgint64(&s);
1286+
AdjustSequence(relid, next);
1287+
break;
1288+
}
12891289
case '0':
12901290
Assert(rel != NULL);
12911291
heap_truncate_one_rel(rel);

contrib/mmts/tests/reinit-mm.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ do
7777
multimaster.arbiter_port = $arbiter_port
7878
multimaster.max_recovery_lag = 30GB
7979
multimaster.referee_connstring = 'dbname=$USER host=127.0.0.1 port=5440 sslmode=disable'
80+
multimaster.monotonic_sequences = on
8081
SQL
8182

8283
cat <<CONF >> tmp_check/node$i/pg_hba.conf

src/backend/commands/sequence.c

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ typedef struct SeqTableData
7575

7676
typedef SeqTableData *SeqTable;
7777

78+
seq_nextval_hook_t SeqNextvalHook;
79+
7880
static HTAB *seqhashtab = NULL; /* hash table for SeqTable items */
7981

8082
/*
@@ -614,6 +616,10 @@ nextval_internal(Oid relid, bool check_permissions)
614616
elm->last += elm->increment;
615617
relation_close(seqrel, NoLock);
616618
last_used_seq = elm;
619+
620+
if (SeqNextvalHook)
621+
SeqNextvalHook(relid, elm->last);
622+
617623
return elm->last;
618624
}
619625

@@ -810,6 +816,9 @@ nextval_internal(Oid relid, bool check_permissions)
810816

811817
relation_close(seqrel, NoLock);
812818

819+
if (SeqNextvalHook)
820+
SeqNextvalHook(relid, result);
821+
813822
return result;
814823
}
815824

@@ -879,6 +888,141 @@ lastval(PG_FUNCTION_ARGS)
879888
PG_RETURN_INT64(result);
880889
}
881890

891+
/*
892+
* Bump last value to next iff next > value.
893+
* Support routine for multimaster's monotonic sequences.
894+
*/
895+
void
896+
AdjustSequence(Oid relid, int64 next)
897+
{
898+
SeqTable elm;
899+
Relation seqrel;
900+
Buffer buf;
901+
HeapTupleData seqdatatuple;
902+
Form_pg_sequence_data seq;
903+
HeapTuple pgstuple;
904+
Form_pg_sequence pgsform;
905+
int64 maxv,
906+
minv,
907+
incby,
908+
cache;
909+
int64 last;
910+
911+
/* open and lock sequence */
912+
init_sequence(relid, &elm, &seqrel);
913+
914+
if (pg_class_aclcheck(elm->relid, GetUserId(), ACL_UPDATE) != ACLCHECK_OK)
915+
ereport(ERROR,
916+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
917+
errmsg("permission denied for sequence %s",
918+
RelationGetRelationName(seqrel))));
919+
920+
pgstuple = SearchSysCache1(SEQRELID, ObjectIdGetDatum(relid));
921+
if (!HeapTupleIsValid(pgstuple))
922+
elog(ERROR, "cache lookup failed for sequence %u", relid);
923+
pgsform = (Form_pg_sequence) GETSTRUCT(pgstuple);
924+
maxv = pgsform->seqmax;
925+
minv = pgsform->seqmin;
926+
incby = pgsform->seqincrement;
927+
cache = pgsform->seqcache;
928+
ReleaseSysCache(pgstuple);
929+
930+
/* cached number is greater than received */
931+
if (elm->last != cache && elm->last + incby > next)
932+
{
933+
relation_close(seqrel, NoLock);
934+
return;
935+
}
936+
937+
/* read-only transactions may only modify temp sequences */
938+
if (!seqrel->rd_islocaltemp)
939+
PreventCommandIfReadOnly("setval()");
940+
941+
/*
942+
* Forbid this during parallel operation because, to make it work, the
943+
* cooperating backends would need to share the backend-local cached
944+
* sequence information. Currently, we don't support that.
945+
*/
946+
PreventCommandIfParallelMode("setval()");
947+
948+
/* lock page' buffer and read tuple */
949+
seq = read_seq_tuple(seqrel, &buf, &seqdatatuple);
950+
951+
if ((next < minv) || (next > maxv))
952+
{
953+
char bufv[100],
954+
bufm[100],
955+
bufx[100];
956+
957+
snprintf(bufv, sizeof(bufv), INT64_FORMAT, next);
958+
snprintf(bufm, sizeof(bufm), INT64_FORMAT, minv);
959+
snprintf(bufx, sizeof(bufx), INT64_FORMAT, maxv);
960+
ereport(ERROR,
961+
(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
962+
errmsg("setval: value %s is out of bounds for sequence \"%s\" (%s..%s)",
963+
bufv, RelationGetRelationName(seqrel),
964+
bufm, bufx)));
965+
}
966+
967+
last = seq->last_value;
968+
if (seq->is_called)
969+
{
970+
last += incby;
971+
}
972+
if (last <= next)
973+
{
974+
next = last + incby*((next - last + incby)/incby);
975+
976+
/* Set the currval() state only if iscalled = true */
977+
if (seq->is_called)
978+
{
979+
elm->last = next; /* last returned number */
980+
elm->last_valid = true;
981+
}
982+
983+
/* In any case, forget any future cached numbers */
984+
elm->cached = elm->last;
985+
986+
/* check the comment above nextval_internal()'s equivalent call. */
987+
if (RelationNeedsWAL(seqrel))
988+
GetTopTransactionId();
989+
990+
/* ready to change the on-disk (or really, in-buffer) tuple */
991+
ptrack_add_block(seqrel, BufferGetBlockNumber(buf));
992+
START_CRIT_SECTION();
993+
994+
seq->last_value = next; /* last fetched number */
995+
seq->log_cnt = 0;
996+
997+
MarkBufferDirty(buf);
998+
999+
/* XLOG stuff */
1000+
if (RelationNeedsWAL(seqrel))
1001+
{
1002+
xl_seq_rec xlrec;
1003+
XLogRecPtr recptr;
1004+
Page page = BufferGetPage(buf);
1005+
1006+
XLogBeginInsert();
1007+
XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT);
1008+
1009+
xlrec.node = seqrel->rd_node;
1010+
XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
1011+
XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
1012+
1013+
recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
1014+
1015+
PageSetLSN(page, recptr);
1016+
}
1017+
1018+
END_CRIT_SECTION();
1019+
}
1020+
1021+
UnlockReleaseBuffer(buf);
1022+
1023+
relation_close(seqrel, NoLock);
1024+
}
1025+
8821026
/*
8831027
* Main internal procedure that handles 2 & 3 arg forms of SETVAL.
8841028
*

src/include/commands/sequence.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,9 @@ extern void seq_desc(StringInfo buf, XLogReaderState *rptr);
7171
extern const char *seq_identify(uint8 info);
7272
extern void seq_mask(char *pagedata, BlockNumber blkno);
7373

74+
typedef void (*seq_nextval_hook_t)(Oid seq_relid, int64 next);
75+
extern seq_nextval_hook_t SeqNextvalHook;
76+
77+
extern void AdjustSequence(Oid relid, int64 next);
78+
7479
#endif /* SEQUENCE_H */

0 commit comments

Comments
 (0)