@@ -344,16 +344,16 @@ process_remote_begin(StringInfo s)
344
344
nodemask_t participantsMask ;
345
345
int rc ;
346
346
347
- gtid .node = pq_getmsgint (s , 4 );
348
- gtid .xid = pq_getmsgint64 (s );
349
- snapshot = pq_getmsgint64 (s );
347
+ gtid .node = pq_getmsgint (s , 4 );
348
+ gtid .xid = pq_getmsgint64 (s );
349
+ snapshot = pq_getmsgint64 (s );
350
350
participantsMask = pq_getmsgint64 (s );
351
351
Assert (gtid .node > 0 );
352
352
353
353
MTM_LOG2 ("REMOTE begin node=%d xid=%llu snapshot=%lld participantsMask=%llx" , gtid .node , (long64 )gtid .xid , snapshot , participantsMask );
354
- MtmResetTransaction ();
354
+ MtmResetTransaction ();
355
355
356
- SetCurrentStatementStartTimestamp ();
356
+ SetCurrentStatementStartTimestamp ();
357
357
StartTransactionCommand ();
358
358
MtmJoinTransaction (& gtid , snapshot , participantsMask );
359
359
@@ -362,7 +362,7 @@ process_remote_begin(StringInfo s)
362
362
GucAltered = false;
363
363
rc = SPI_execute ("RESET SESSION AUTHORIZATION; reset all;" , false, 0 );
364
364
SPI_finish ();
365
- if (rc < 0 ) {
365
+ if (rc < 0 ) {
366
366
MTM_ELOG (ERROR , "Failed to set reset context: %d" , rc );
367
367
}
368
368
}
@@ -403,13 +403,13 @@ process_remote_message(StringInfo s)
403
403
404
404
rc = SPI_execute (messageBody , false, 0 );
405
405
SPI_finish ();
406
- if (rc < 0 ) {
406
+ if (rc < 0 ) {
407
407
MTM_ELOG (ERROR , "Failed to execute utility statement %s" , messageBody );
408
- } else {
408
+ } else {
409
409
MemoryContextSwitchTo (MtmApplyContext );
410
410
PushActiveSnapshot (GetTransactionSnapshot ());
411
411
412
- if (MtmVacuumStmt != NULL ) {
412
+ if (MtmVacuumStmt != NULL ) {
413
413
ExecVacuum (MtmVacuumStmt , 1 );
414
414
} else if (MtmIndexStmt != NULL ) {
415
415
Oid relid = RangeVarGetRelidExtended (MtmIndexStmt -> relation , ShareUpdateExclusiveLock ,
@@ -426,7 +426,7 @@ process_remote_message(StringInfo s)
426
426
true, /* check_rights */
427
427
false, /* skip_build */
428
428
false); /* quiet */
429
-
429
+
430
430
}
431
431
else if (MtmDropStmt != NULL )
432
432
{
@@ -449,7 +449,7 @@ process_remote_message(StringInfo s)
449
449
if (ActiveSnapshotSet ())
450
450
PopActiveSnapshot ();
451
451
}
452
- if (standalone ) {
452
+ if (standalone ) {
453
453
CommitTransactionCommand ();
454
454
}
455
455
break ;
@@ -462,18 +462,18 @@ process_remote_message(StringInfo s)
462
462
/* This function is called directly by receiver, so there is no race condition and we can update
463
463
* restartLSN without locks
464
464
*/
465
- if (origin_node == MtmReplicationNodeId ) {
465
+ if (origin_node == MtmReplicationNodeId ) {
466
466
Assert (msg -> origin_lsn == INVALID_LSN );
467
467
msg -> origin_lsn = MtmSenderWalEnd ;
468
468
}
469
- if (Mtm -> nodes [origin_node - 1 ].restartLSN < msg -> origin_lsn ) {
469
+ if (Mtm -> nodes [origin_node - 1 ].restartLSN < msg -> origin_lsn ) {
470
470
MTM_LOG1 ("Receive logical abort message for transaction %s from node %d: %llx < %llx" , msg -> gid , origin_node , Mtm -> nodes [origin_node - 1 ].restartLSN , msg -> origin_lsn );
471
471
Mtm -> nodes [origin_node - 1 ].restartLSN = msg -> origin_lsn ;
472
- replorigin_session_origin_lsn = msg -> origin_lsn ;
472
+ replorigin_session_origin_lsn = msg -> origin_lsn ;
473
473
MtmRollbackPreparedTransaction (origin_node , msg -> gid );
474
- } else {
475
- if (msg -> origin_lsn != INVALID_LSN ) {
476
- MTM_LOG1 ("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx" ,
474
+ } else {
475
+ if (msg -> origin_lsn != INVALID_LSN ) {
476
+ MTM_LOG1 ("Ignore rollback of transaction %s from node %d because it's LSN %llx <= %llx" ,
477
477
msg -> gid , origin_node , msg -> origin_lsn , Mtm -> nodes [origin_node - 1 ].restartLSN );
478
478
}
479
479
}
@@ -498,7 +498,7 @@ process_remote_message(StringInfo s)
498
498
}
499
499
return standalone ;
500
500
}
501
-
501
+
502
502
static void
503
503
read_tuple_parts (StringInfo s , Relation rel , TupleData * tup )
504
504
{
@@ -529,7 +529,7 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
529
529
const char * data ;
530
530
int len ;
531
531
532
- if (att -> atttypid == InvalidOid ) {
532
+ if (att -> atttypid == InvalidOid ) {
533
533
continue ;
534
534
}
535
535
@@ -612,13 +612,13 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
612
612
static void
613
613
close_rel (Relation rel )
614
614
{
615
- if (rel != NULL )
615
+ if (rel != NULL )
616
616
{
617
- heap_close (rel , NoLock );
618
- }
617
+ heap_close (rel , NoLock );
618
+ }
619
619
}
620
620
621
- static Relation
621
+ static Relation
622
622
read_rel (StringInfo s , LOCKMODE mode )
623
623
{
624
624
int relnamelen ;
@@ -629,20 +629,20 @@ read_rel(StringInfo s, LOCKMODE mode)
629
629
MemoryContext old_context ;
630
630
631
631
local_relid = pglogical_relid_map_get (remote_relid );
632
- if (local_relid == InvalidOid ) {
632
+ if (local_relid == InvalidOid ) {
633
633
rv = makeNode (RangeVar );
634
634
635
635
nspnamelen = pq_getmsgbyte (s );
636
636
rv -> schemaname = (char * ) pq_getmsgbytes (s , nspnamelen );
637
-
637
+
638
638
relnamelen = pq_getmsgbyte (s );
639
639
rv -> relname = (char * ) pq_getmsgbytes (s , relnamelen );
640
-
640
+
641
641
local_relid = RangeVarGetRelidExtended (rv , mode , false, false, NULL , NULL );
642
642
old_context = MemoryContextSwitchTo (TopMemoryContext );
643
643
pglogical_relid_map_put (remote_relid , local_relid );
644
644
MemoryContextSwitchTo (old_context );
645
- } else {
645
+ } else {
646
646
nspnamelen = pq_getmsgbyte (s );
647
647
s -> cursor += nspnamelen ;
648
648
relnamelen = pq_getmsgbyte (s );
@@ -707,29 +707,29 @@ process_remote_commit(StringInfo in)
707
707
Assert (IsTransactionState () && TransactionIdIsValid (MtmGetCurrentTransactionId ()));
708
708
strncpy (gid , pq_getmsgstring (in ), sizeof gid );
709
709
MTM_LOG2 ("%d: PGLOGICAL_PREPARE %s, (%llx,%llx,%llx)" , MyProcPid , gid , commit_lsn , end_lsn , origin_lsn );
710
- if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_IN_PROGRESS ) == TRANSACTION_STATUS_ABORTED ) {
711
- MTM_LOG1 ("Avoid prepare of previously aborted global transaction %s" , gid );
710
+ if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_IN_PROGRESS ) == TRANSACTION_STATUS_ABORTED ) {
711
+ MTM_LOG1 ("Avoid prepare of previously aborted global transaction %s" , gid );
712
712
AbortCurrentTransaction ();
713
- } else {
713
+ } else {
714
714
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
715
715
BeginTransactionBlock (false);
716
716
CommitTransactionCommand ();
717
717
StartTransactionCommand ();
718
-
718
+
719
719
MtmBeginSession (origin_node );
720
720
/* PREPARE itself */
721
721
MtmSetCurrentTransactionGID (gid );
722
722
PrepareTransactionBlock (gid );
723
723
CommitTransactionCommand ();
724
724
725
- if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_UNKNOWN ) == TRANSACTION_STATUS_ABORTED ) {
726
- MTM_LOG1 ("Perform delayed rollback of prepared global transaction %s" , gid );
725
+ if (MtmExchangeGlobalTransactionStatus (gid , TRANSACTION_STATUS_UNKNOWN ) == TRANSACTION_STATUS_ABORTED ) {
726
+ MTM_LOG1 ("Perform delayed rollback of prepared global transaction %s" , gid );
727
727
StartTransactionCommand ();
728
728
MtmSetCurrentTransactionGID (gid );
729
729
FinishPreparedTransaction (gid , false);
730
- CommitTransactionCommand ();
730
+ CommitTransactionCommand ();
731
731
Assert (!MtmTransIsActive ());
732
- }
732
+ }
733
733
MtmEndSession (origin_node , true);
734
734
}
735
735
break ;
@@ -771,7 +771,7 @@ process_remote_commit(StringInfo in)
771
771
default :
772
772
Assert (false);
773
773
}
774
- if (Mtm -> status == MTM_RECOVERY ) {
774
+ if (Mtm -> status == MTM_RECOVERY ) {
775
775
MTM_LOG1 ("Recover transaction %s event=%d" , gid , event );
776
776
}
777
777
MtmUpdateLsnMapping (MtmReplicationNodeId , end_lsn );
@@ -871,12 +871,12 @@ process_remote_insert(StringInfo s, Relation rel)
871
871
if (strcmp (RelationGetRelationName (rel ), MULTIMASTER_LOCAL_TABLES_TABLE ) == 0 &&
872
872
strcmp (get_namespace_name (RelationGetNamespace (rel )), MULTIMASTER_SCHEMA_NAME ) == 0 )
873
873
{
874
- MtmMakeTableLocal (( char * ) DatumGetPointer ( new_tuple .values [0 ]), ( char * ) DatumGetPointer (new_tuple .values [1 ]));
874
+ MtmMakeTableLocal (TextDatumGetCString ( new_tuple .values [0 ]), TextDatumGetCString (new_tuple .values [1 ]));
875
875
}
876
-
876
+
877
877
ExecResetTupleTable (estate -> es_tupleTable , true);
878
878
FreeExecutorState (estate );
879
-
879
+
880
880
CommandCounterIncrement ();
881
881
}
882
882
@@ -989,12 +989,12 @@ process_remote_update(StringInfo s, Relation rel)
989
989
errdetail ("Most likely we have DELETE-UPDATE conflict" )));
990
990
991
991
}
992
-
992
+
993
993
PopActiveSnapshot ();
994
-
994
+
995
995
/* release locks upon commit */
996
996
index_close (idxrel , NoLock );
997
-
997
+
998
998
ExecResetTupleTable (estate -> es_tupleTable , true);
999
999
FreeExecutorState (estate );
1000
1000
@@ -1089,7 +1089,7 @@ void MtmExecutor(void* work, size_t size)
1089
1089
s .len = size ;
1090
1090
s .maxlen = -1 ;
1091
1091
s .cursor = 0 ;
1092
-
1092
+
1093
1093
if (MtmApplyContext == NULL ) {
1094
1094
MtmApplyContext = AllocSetContextCreate (TopMemoryContext ,
1095
1095
"ApplyContext" ,
@@ -1100,15 +1100,15 @@ void MtmExecutor(void* work, size_t size)
1100
1100
top_context = MemoryContextSwitchTo (MtmApplyContext );
1101
1101
replorigin_session_origin = InvalidRepOriginId ;
1102
1102
PG_TRY ();
1103
- {
1103
+ {
1104
1104
bool inside_transaction = true;
1105
- do {
1105
+ do {
1106
1106
char action = pq_getmsgbyte (& s );
1107
1107
old_context = MemoryContextSwitchTo (MtmApplyContext );
1108
-
1108
+
1109
1109
MTM_LOG2 ("%d: REMOTE process action %c" , MyProcPid , action );
1110
1110
#if 0
1111
- if (Mtm -> status == MTM_RECOVERY ) {
1111
+ if (Mtm -> status == MTM_RECOVERY ) {
1112
1112
MTM_LOG1 ("Replay action %c[%x]" , action , s .data [s .cursor ]);
1113
1113
}
1114
1114
#endif
@@ -1150,7 +1150,7 @@ void MtmExecutor(void* work, size_t size)
1150
1150
}
1151
1151
case '(' :
1152
1152
{
1153
- size_t size = pq_getmsgint (& s , 4 );
1153
+ size_t size = pq_getmsgint (& s , 4 );
1154
1154
s .data = MemoryContextAlloc (TopMemoryContext , size );
1155
1155
save_cursor = s .cursor ;
1156
1156
save_len = s .len ;
@@ -1175,10 +1175,10 @@ void MtmExecutor(void* work, size_t size)
1175
1175
relid = RelationGetRelid (rel );
1176
1176
close_rel (rel );
1177
1177
rel = NULL ;
1178
- next = pq_getmsgint64 (& s );
1178
+ next = pq_getmsgint64 (& s );
1179
1179
AdjustSequence (relid , next );
1180
1180
break ;
1181
- }
1181
+ }
1182
1182
case '0' :
1183
1183
Assert (rel != NULL );
1184
1184
heap_truncate_one_rel (rel );
@@ -1198,7 +1198,7 @@ void MtmExecutor(void* work, size_t size)
1198
1198
}
1199
1199
default :
1200
1200
MTM_ELOG (ERROR , "unknown action of type %c" , action );
1201
- }
1201
+ }
1202
1202
MemoryContextSwitchTo (old_context );
1203
1203
MemoryContextResetAndDeleteChildren (MtmApplyContext );
1204
1204
} while (inside_transaction );
@@ -1217,15 +1217,16 @@ void MtmExecutor(void* work, size_t size)
1217
1217
MTM_LOG2 ("%d: REMOTE end abort transaction %llu" , MyProcPid , (long64 )MtmGetCurrentTransactionId ());
1218
1218
}
1219
1219
PG_END_TRY ();
1220
- if (s .data != work ) {
1220
+ if (s .data != work ) {
1221
1221
pfree (s .data );
1222
1222
}
1223
1223
#if 0 /* spill file is expecrted to be closed by tranaction commit or rollback */
1224
- if (spill_file >= 0 ) {
1224
+ if (spill_file >= 0 ) {
1225
1225
MtmCloseSpillFile (spill_file );
1226
1226
}
1227
1227
#endif
1228
1228
MemoryContextSwitchTo (top_context );
1229
1229
MemoryContextResetAndDeleteChildren (MtmApplyContext );
1230
1230
MtmReleaseLocks ();
1231
1231
}
1232
+
0 commit comments