Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fc058eb

Browse files
committed
Resolve conflicts
2 parents 7fa827b + df085fc commit fc058eb

File tree

7 files changed

+83
-21
lines changed

7 files changed

+83
-21
lines changed

contrib/mmts/arbiter.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ static void MtmMonitor(Datum arg);
9191
static void MtmSendHeartbeat(void);
9292
static bool MtmSendToNode(int node, void const* buf, int size);
9393

94-
94+
/*
9595
static char const* const messageText[] =
9696
{
9797
"INVALID",
@@ -105,6 +105,7 @@ static char const* const messageText[] =
105105
"POLL_REQUEST",
106106
"POLL_STATUS"
107107
};
108+
*/
108109

109110
static BackgroundWorker MtmSenderWorker = {
110111
"mtm-sender",
@@ -363,7 +364,7 @@ static void MtmSendHeartbeat()
363364
MTM_LOG2("Send heartbeat to node %d with timestamp %ld", i+1, now);
364365
}
365366
} else {
366-
MTM_LOG1("Do not send heartbeat to node %d, busy mask %ld, status %d", i+1, busy_mask, Mtm->status);
367+
MTM_LOG1("Do not send heartbeat to node %d, busy mask %lld, status %d", i+1, (long long) busy_mask, Mtm->status);
367368
}
368369
}
369370
}
@@ -940,7 +941,9 @@ static void MtmReceiver(Datum arg)
940941
CommitTransactionCommand();
941942
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
942943
} else {
943-
elog(LOG, "Receive response %d for transaction %s for node %d, votedMask=%lx, participantsMask=%lx", msg->status, msg->gid, node, ts->votedMask, ts->participantsMask & ~Mtm->disabledNodeMask);
944+
elog(LOG, "Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
945+
msg->status, msg->gid, node, (long long) ts->votedMask,
946+
(long long) (ts->participantsMask & ~Mtm->disabledNodeMask) );
944947
continue;
945948
}
946949
} else if (ts->status == TRANSACTION_STATUS_ABORTED && msg->status == TRANSACTION_STATUS_COMMITTED) {

contrib/mmts/multimaster.c

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ static ExecutorFinish_hook_type PreviousExecutorFinishHook;
232232
static ProcessUtility_hook_type PreviousProcessUtilityHook;
233233
static shmem_startup_hook_type PreviousShmemStartupHook;
234234

235+
static nodemask_t lastKnownMatrix[MAX_NODES];
235236

236237
static void MtmExecutorFinish(QueryDesc *queryDesc);
237238
static void MtmProcessUtility(Node *parsetree, const char *queryString,
@@ -1368,8 +1369,8 @@ static void MtmEnableNode(int nodeId)
13681369
void MtmRecoveryCompleted(void)
13691370
{
13701371
int i;
1371-
MTM_LOG1("Recovery of node %d is completed, disabled mask=%lx, connectivity mask=%lx, live nodes=%d",
1372-
MtmNodeId, Mtm->disabledNodeMask, Mtm->connectivityMask, Mtm->nLiveNodes);
1372+
MTM_LOG1("Recovery of node %d is completed, disabled mask=%llx, connectivity mask=%llx, live nodes=%d",
1373+
MtmNodeId, (long long) Mtm->disabledNodeMask, (long long) Mtm->connectivityMask, Mtm->nLiveNodes);
13731374
MtmLock(LW_EXCLUSIVE);
13741375
Mtm->recoverySlot = 0;
13751376
BIT_CLEAR(Mtm->disabledNodeMask, MtmNodeId-1);
@@ -1563,7 +1564,8 @@ static bool
15631564
MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
15641565
{
15651566
int i, j, n = Mtm->nAllNodes;
1566-
fprintf(stderr, "Connectivity matrix:\n");
1567+
bool changed = false;
1568+
15671569
for (i = 0; i < n; i++) {
15681570
if (i+1 != MtmNodeId) {
15691571
void* data = RaftableGet(psprintf("node-mask-%d", i+1), NULL, NULL, nowait);
@@ -1574,12 +1576,27 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
15741576
} else {
15751577
matrix[i] = Mtm->connectivityMask;
15761578
}
1577-
for (j = 0; j < n; j++) {
1578-
putc(BIT_CHECK(matrix[i], j) ? 'X' : '+', stderr);
1579+
1580+
if (lastKnownMatrix[i] != matrix[i])
1581+
{
1582+
changed = true;
1583+
lastKnownMatrix[i] = matrix[i];
15791584
}
1580-
putc('\n', stderr);
15811585
}
1582-
fputs("-----------------------\n", stderr);
1586+
1587+
/* Print matrix if changed */
1588+
if (changed)
1589+
{
1590+
fprintf(stderr, "Connectivity matrix:\n");
1591+
for (i = 0; i < n; i++)
1592+
{
1593+
for (j = 0; j < n; j++)
1594+
putc(BIT_CHECK(matrix[i], j) ? 'X' : '+', stderr);
1595+
putc('\n', stderr);
1596+
}
1597+
fputs("-----------------------\n", stderr);
1598+
}
1599+
15831600
/* make matrix symetric: required for Bron–Kerbosch algorithm */
15841601
for (i = 0; i < n; i++) {
15851602
for (j = 0; j < i; j++) {
@@ -1588,8 +1605,9 @@ MtmBuildConnectivityMatrix(nodemask_t* matrix, bool nowait)
15881605
}
15891606
matrix[i] &= ~((nodemask_t)1 << i);
15901607
}
1608+
15911609
return true;
1592-
}
1610+
}
15931611

15941612

15951613
/**
@@ -1610,6 +1628,11 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16101628
}
16111629

16121630
clique = MtmFindMaxClique(matrix, Mtm->nAllNodes, &clique_size);
1631+
1632+
if ( clique == (~Mtm->disabledNodeMask & (((nodemask_t)1 << Mtm->nAllNodes)-1)) )
1633+
/* Nothing is changed */
1634+
return false;
1635+
16131636
if (clique_size >= Mtm->nAllNodes/2+1) { /* have quorum */
16141637
fprintf(stderr, "Old mask: ");
16151638
for (i = 0; i < Mtm->nAllNodes; i++) {
@@ -1648,7 +1671,7 @@ bool MtmRefreshClusterStatus(bool nowait, int testNodeId)
16481671
/* Interrupt voting for active transaction and abort them */
16491672
for (ts = Mtm->transListHead; ts != NULL; ts = ts->next) {
16501673
MTM_LOG3("Active transaction gid='%s', coordinator=%d, xid=%d, status=%d, gtid.xid=%d",
1651-
ts->gid, ts->gtid.nхode, ts->xid, ts->status, ts->gtid.xid);
1674+
ts->gid, ts->gtid.node, ts->xid, ts->status, ts->gtid.xid);
16521675
if (MtmIsCoordinator(ts)) {
16531676
if (!ts->votingCompleted && disabled != 0 && ts->status != TRANSACTION_STATUS_ABORTED) {
16541677
MtmAbortTransaction(ts);
@@ -1705,7 +1728,7 @@ void MtmOnNodeDisconnect(int nodeId)
17051728
MtmLock(LW_EXCLUSIVE);
17061729
BIT_SET(Mtm->connectivityMask, nodeId-1);
17071730
BIT_SET(Mtm->reconnectMask, nodeId-1);
1708-
MTM_LOG1("Disconnect node %d connectivity mask %lx", nodeId, Mtm->connectivityMask);
1731+
MTM_LOG1("Disconnect node %d connectivity mask %llx", nodeId, (long long) Mtm->connectivityMask);
17091732
MtmUnlock();
17101733

17111734
if (!RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false))
@@ -1755,7 +1778,7 @@ void MtmOnNodeConnect(int nodeId)
17551778
BIT_CLEAR(Mtm->reconnectMask, nodeId-1);
17561779
MtmUnlock();
17571780

1758-
MTM_LOG1("Reconnect node %d, connectivityMask=%lx", nodeId, Mtm->connectivityMask);
1781+
MTM_LOG1("Reconnect node %d, connectivityMask=%llx", nodeId, (long long) Mtm->connectivityMask);
17591782
RaftableSet(psprintf("node-mask-%d", MtmNodeId), &Mtm->connectivityMask, sizeof Mtm->connectivityMask, false);
17601783
}
17611784

@@ -3579,7 +3602,12 @@ static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
35793602
hash_search(MtmGucHash, key, HASH_REMOVE, NULL);
35803603
}
35813604
break;
3605+
35823606
case VAR_RESET_ALL:
3607+
{
3608+
hash_destroy(MtmGucHash);
3609+
MtmGucHashInit();
3610+
}
35833611
break;
35843612

35853613
case VAR_SET_MULTI:
@@ -3591,7 +3619,11 @@ static void MtmGucSet(VariableSetStmt *stmt, const char *queryStr)
35913619

35923620
static void MtmGucDiscard(DiscardStmt *stmt)
35933621
{
3594-
3622+
if (stmt->target == DISCARD_ALL)
3623+
{
3624+
hash_destroy(MtmGucHash);
3625+
MtmGucHashInit();
3626+
}
35953627
}
35963628

35973629
static void MtmGucClear(void)
@@ -3616,7 +3648,18 @@ static char * MtmGucSerialize(void)
36163648
appendStringInfoString(serialized_gucs, "SET ");
36173649
appendStringInfoString(serialized_gucs, hentry->key);
36183650
appendStringInfoString(serialized_gucs, " TO ");
3619-
appendStringInfoString(serialized_gucs, hentry->value);
3651+
3652+
/* quite a crutch */
3653+
if (strcmp(hentry->key, "work_mem") == 0)
3654+
{
3655+
appendStringInfoString(serialized_gucs, "'");
3656+
appendStringInfoString(serialized_gucs, hentry->value);
3657+
appendStringInfoString(serialized_gucs, "'");
3658+
}
3659+
else
3660+
{
3661+
appendStringInfoString(serialized_gucs, hentry->value);
3662+
}
36203663
appendStringInfoString(serialized_gucs, "; ");
36213664
}
36223665
}
@@ -3838,6 +3881,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38383881
{
38393882
MTM_LOG1("Xact accessed temp table, stopping replication");
38403883
MtmTx.isDistributed = false; /* Skip */
3884+
MtmTx.snapshot = INVALID_CSN;
38413885
}
38423886

38433887
}

contrib/mmts/pglogical_apply.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,8 @@ process_remote_insert(StringInfo s, Relation rel)
680680
ScanKey *index_keys;
681681
int i;
682682

683+
PushActiveSnapshot(GetTransactionSnapshot());
684+
683685
estate = create_rel_estate(rel);
684686
newslot = ExecInitExtraTupleSlot(estate);
685687
oldslot = ExecInitExtraTupleSlot(estate);
@@ -755,6 +757,9 @@ process_remote_insert(StringInfo s, Relation rel)
755757

756758
ExecCloseIndices(estate->es_result_relation_info);
757759

760+
if (ActiveSnapshotSet())
761+
PopActiveSnapshot();
762+
758763
heap_close(rel, NoLock);
759764
ExecResetTupleTable(estate->es_tupleTable, true);
760765
FreeExecutorState(estate);

contrib/mmts/pglogical_proto.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,14 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
112112

113113
if (!isRecovery && csn == INVALID_CSN) {
114114
MtmIsFilteredTxn = true;
115-
} else {
115+
MTM_LOG3("%d: pglogical_write_begin XID=%d filtered", MyProcPid, txn->xid);
116+
} else {
117+
MTM_LOG3("%d: pglogical_write_begin XID=%d sent", MyProcPid, txn->xid);
118+
MtmIsFilteredTxn = false;
116119
pq_sendbyte(out, 'B'); /* BEGIN */
117120
pq_sendint(out, MtmNodeId, 4);
118121
pq_sendint(out, isRecovery ? InvalidTransactionId : txn->xid, 4);
119122
pq_sendint64(out, csn);
120-
MtmIsFilteredTxn = false;
121123
MtmTransactionRecords = 0;
122124
}
123125
}
@@ -128,6 +130,12 @@ pglogical_write_message(StringInfo out,
128130
{
129131
if (*prefix == 'L') {
130132
MTM_LOG1("Send deadlock message to node %d", MtmReplicationNodeId);
133+
} else {
134+
if (MtmIsFilteredTxn)
135+
{
136+
MTM_LOG3("%d: pglogical_write_message filtered", MyProcPid);
137+
return;
138+
}
131139
}
132140
pq_sendbyte(out, *prefix);
133141
pq_sendint(out, sz, 4);

contrib/raftable/raftable.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ static RaftableMessage *raftable_try_query(RaftableMessage *msg, size_t size, si
332332
RaftableMessage *answer;
333333

334334
s = get_connection(timeout);
335-
if (s < 0) return false;
335+
if (s < 0) return NULL;
336336

337337
if (timeout_happened(timeout))
338338
{
@@ -619,7 +619,7 @@ pid_t raftable_start(int id)
619619
snprintf(worker.bgw_name, BGW_MAXLEN, "raftable worker %d", sharedcfg->id);
620620
worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
621621
worker.bgw_start_time = BgWorkerStart_ConsistentState;
622-
worker.bgw_restart_time = BGW_NEVER_RESTART;
622+
worker.bgw_restart_time = RAFTABLE_RESTART_TIMEOUT;
623623
worker.bgw_main = raftable_worker_main;
624624
worker.bgw_main_arg = PointerGetDatum(&sharedcfg);
625625

contrib/raftable/raftable.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#ifndef __RAFTABLE_H__
22
#define __RAFTABLE_H__
33

4+
#define RAFTABLE_RESTART_TIMEOUT 1
5+
46
/*
57
* Gets value by key. Returns the value or NULL if not found. Gives up after
68
* 'timeout_ms' milliseconds

src/test/regress/serial_schedule

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ test: triggers
7070
test: inherit
7171
test: create_table_like
7272
test: typed_table
73-
# test: vacuum # issue#18
73+
test: vacuum
7474
test: drop_if_exists
7575
test: updatable_views
7676
test: rolenames

0 commit comments

Comments
 (0)