@@ -290,7 +290,7 @@ static ExecutorStart_hook_type PreviousExecutorStartHook;
290
290
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
291
291
static ProcessUtility_hook_type PreviousProcessUtilityHook ;
292
292
static shmem_startup_hook_type PreviousShmemStartupHook ;
293
- // static seq_nextval_hook_t PreviousSeqNextvalHook;
293
+ static seq_nextval_hook_t PreviousSeqNextvalHook ;
294
294
295
295
static void MtmExecutorStart (QueryDesc * queryDesc , int eflags );
296
296
static void MtmExecutorFinish (QueryDesc * queryDesc );
@@ -602,14 +602,16 @@ csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, nodemask_
602
602
MtmTransState * ts = (MtmTransState * )hash_search (MtmXid2State , & xid , HASH_FIND , NULL );
603
603
if (ts != NULL ) {
604
604
* participantsMask = ts -> participantsMask ;
605
- /* If node is disables, then we are in a process of recovery of this node */
606
- if (!ts -> isLocal && BIT_CHECK (ts -> participantsMask |Mtm -> disabledNodeMask , nodeId - 1 )) {
605
+ if (!ts -> isLocal )
607
606
snapshot = ts -> snapshot ;
608
- Assert (ts -> gtid .node == MtmNodeId || MtmIsRecoverySession );
609
- } else {
610
- MTM_LOG1 ("Do not send transaction %s (%llu) to node %d participants mask %llx" ,
611
- ts -> gid , (long64 )ts -> xid , nodeId , ts -> participantsMask );
612
- }
607
+ // /* If node is disables, then we are in a process of recovery of this node */
608
+ // if (!ts->isLocal && BIT_CHECK(ts->participantsMask|Mtm->disabledNodeMask, nodeId-1)) {
609
+ // snapshot = ts->snapshot;
610
+ // Assert(ts->gtid.node == MtmNodeId || MtmIsRecoverySession);
611
+ // } else {
612
+ // MTM_LOG1("Do not send transaction %s (%llu) to node %d participants mask %llx",
613
+ // ts->gid, (long64)ts->xid, nodeId, ts->participantsMask);
614
+ // }
613
615
}
614
616
}
615
617
MtmUnlock ();
@@ -3389,8 +3391,8 @@ _PG_init(void)
3389
3391
PreviousProcessUtilityHook = ProcessUtility_hook ;
3390
3392
ProcessUtility_hook = MtmProcessUtility ;
3391
3393
3392
- // PreviousSeqNextvalHook = SeqNextvalHook;
3393
- // SeqNextvalHook = MtmSeqNextvalHook;
3394
+ PreviousSeqNextvalHook = SeqNextvalHook ;
3395
+ SeqNextvalHook = MtmSeqNextvalHook ;
3394
3396
}
3395
3397
3396
3398
/*
@@ -3402,7 +3404,7 @@ _PG_fini(void)
3402
3404
shmem_startup_hook = PreviousShmemStartupHook ;
3403
3405
ExecutorFinish_hook = PreviousExecutorFinishHook ;
3404
3406
ProcessUtility_hook = PreviousProcessUtilityHook ;
3405
- // SeqNextvalHook = PreviousSeqNextvalHook;
3407
+ SeqNextvalHook = PreviousSeqNextvalHook ;
3406
3408
}
3407
3409
3408
3410
@@ -4462,7 +4464,7 @@ Datum mtm_broadcast_table(PG_FUNCTION_ARGS)
4462
4464
{
4463
4465
MtmCopyRequest copy ;
4464
4466
copy .sourceTable = PG_GETARG_OID (0 );
4465
- copy .targetNodes = PG_GETARG_INT64 ( 1 ) ;
4467
+ copy .targetNodes = ~ Mtm -> disabledNodeMask ;
4466
4468
LogLogicalMessage ("B" , (char * )& copy , sizeof (copy ), true);
4467
4469
MtmTx .containsDML = true;
4468
4470
PG_RETURN_VOID ();
@@ -4724,7 +4726,7 @@ static void
4724
4726
MtmGenerateGid (char * gid )
4725
4727
{
4726
4728
static int localCount ;
4727
- sprintf (gid , "MTM-%d-%d-%d-%ld" , MtmNodeId , MyProcPid , ++ localCount , (int64 ) GetCurrentTimestamp ());
4729
+ sprintf (gid , "MTM-%d-%d-%d-" INT64_FORMAT , MtmNodeId , MyProcPid , ++ localCount , (int64 ) GetCurrentTimestamp ());
4728
4730
}
4729
4731
4730
4732
/*
@@ -5088,7 +5090,33 @@ static bool MtmFunctionProfileDependsOnTempTable(CreateFunctionStmt* func)
5088
5090
return false;
5089
5091
}
5090
5092
5093
+ static void
5094
+ AdjustCreateSequence (List * options )
5095
+ {
5096
+ bool has_increment = false, has_start = false;
5097
+ ListCell * option ;
5091
5098
5099
+ foreach (option , options )
5100
+ {
5101
+ DefElem * defel = (DefElem * ) lfirst (option );
5102
+ if (strcmp (defel -> defname , "increment" ) == 0 )
5103
+ has_increment = true;
5104
+ else if (strcmp (defel -> defname , "start" ) == 0 )
5105
+ has_start = true;
5106
+ }
5107
+
5108
+ if (!has_increment )
5109
+ {
5110
+ DefElem * defel = makeDefElem ("increment" , (Node * ) makeInteger (MtmMaxNodes ), -1 );
5111
+ options = lappend (options , defel );
5112
+ }
5113
+
5114
+ if (!has_start )
5115
+ {
5116
+ DefElem * defel = makeDefElem ("start" , (Node * ) makeInteger (MtmNodeId ), -1 );
5117
+ options = lappend (options , defel );
5118
+ }
5119
+ }
5092
5120
5093
5121
static void MtmProcessUtility (PlannedStmt * pstmt ,
5094
5122
const char * queryString , ProcessUtilityContext context ,
@@ -5161,6 +5189,14 @@ static void MtmProcessUtility(PlannedStmt *pstmt,
5161
5189
elog (ERROR , "Multimaster doesn't support creating and dropping databases" );
5162
5190
break ;
5163
5191
5192
+ case T_CreateSeqStmt :
5193
+ {
5194
+ CreateSeqStmt * stmt = (CreateSeqStmt * ) parsetree ;
5195
+ if (!MtmVolksWagenMode )
5196
+ AdjustCreateSequence (stmt -> options );
5197
+ }
5198
+ break ;
5199
+
5164
5200
case T_CreateTableSpaceStmt :
5165
5201
case T_DropTableSpaceStmt :
5166
5202
{
@@ -5453,11 +5489,9 @@ static void MtmProcessUtility(PlannedStmt *pstmt,
5453
5489
if (relid != InvalidOid ) {
5454
5490
Oid constraint_oid ;
5455
5491
Bitmapset * pk = get_primary_key_attnos (relid , true, & constraint_oid );
5456
- if (pk == NULL && !MtmVolksWagenMode ) {
5492
+ if (pk == NULL && !MtmVolksWagenMode && MtmIgnoreTablesWithoutPk ) {
5457
5493
elog (WARNING ,
5458
- MtmIgnoreTablesWithoutPk
5459
- ? "Table %s.%s without primary will not be replicated"
5460
- : "Updates and deletes of table %s.%s without primary will not be replicated" ,
5494
+ "Table %s.%s without primary will not be replicated" ,
5461
5495
create -> relation -> schemaname ? create -> relation -> schemaname : "public" ,
5462
5496
create -> relation -> relname );
5463
5497
}
0 commit comments