Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 74ae6ab

Browse files
committed
Merge branch 'more_tests' of github.com:postgrespro/postgres_cluster into more_tests
2 parents bf32a65 + bcbc076 commit 74ae6ab

File tree

8 files changed

+140
-112
lines changed

8 files changed

+140
-112
lines changed

contrib/mmts/Cluster.pm

+1-1
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ sub stop
233233
}
234234
}
235235
}
236-
236+
sleep(2);
237237
return $ok;
238238
}
239239

contrib/mmts/bgwpool.c

+31-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "postmaster/bgworker.h"
66
#include "storage/s_lock.h"
77
#include "storage/spin.h"
8+
#include "storage/proc.h"
89
#include "storage/pg_sema.h"
910
#include "storage/shmem.h"
1011
#include "datatype/timestamp.h"
@@ -16,23 +17,41 @@
1617
bool MtmIsLogicalReceiver;
1718
int MtmMaxWorkers;
1819

20+
static BgwPool* pool;
21+
22+
static void BgwShutdownWorker(int sig)
23+
{
24+
BgwPoolStop(pool);
25+
}
26+
1927
static void BgwPoolMainLoop(BgwPool* pool)
2028
{
2129
int size;
2230
void* work;
2331
static PortalData fakePortal;
32+
sigset_t sset;
2433

2534
MtmIsLogicalReceiver = true;
2635

36+
signal(SIGINT, BgwShutdownWorker);
37+
signal(SIGQUIT, BgwShutdownWorker);
38+
signal(SIGTERM, BgwShutdownWorker);
39+
40+
sigfillset(&sset);
41+
sigprocmask(SIG_UNBLOCK, &sset, NULL);
42+
2743
BackgroundWorkerUnblockSignals();
2844
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
2945
ActivePortal = &fakePortal;
3046
ActivePortal->status = PORTAL_ACTIVE;
3147
ActivePortal->sourceText = "";
3248

33-
while(true) {
49+
while (true) {
3450
PGSemaphoreLock(&pool->available);
3551
SpinLockAcquire(&pool->lock);
52+
if (pool->shutdown) {
53+
break;
54+
}
3655
size = *(int*)&pool->queue[pool->head];
3756
Assert(size < pool->size);
3857
work = malloc(size);
@@ -64,6 +83,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
6483
pool->lastPeakTime = 0;
6584
SpinLockRelease(&pool->lock);
6685
}
86+
SpinLockRelease(&pool->lock);
6787
}
6888

6989
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers)
@@ -75,6 +95,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
7595
PGSemaphoreReset(&pool->available);
7696
PGSemaphoreReset(&pool->overflow);
7797
SpinLockInit(&pool->lock);
98+
pool->shutdown = false;
7899
pool->producerBlocked = false;
79100
pool->head = 0;
80101
pool->tail = 0;
@@ -167,7 +188,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
167188
}
168189

169190
SpinLockAcquire(&pool->lock);
170-
while (true) {
191+
while (!pool->shutdown) {
171192
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
172193
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
173194
{
@@ -204,3 +225,11 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
204225
SpinLockRelease(&pool->lock);
205226
}
206227

228+
void BgwPoolStop(BgwPool* pool)
229+
{
230+
SpinLockAcquire(&pool->lock);
231+
pool->shutdown = true;
232+
SpinLockRelease(&pool->lock);
233+
PGSemaphoreUnlock(&pool->available);
234+
PGSemaphoreUnlock(&pool->overflow);
235+
}

contrib/mmts/bgwpool.h

+2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ typedef struct
3434
time_t lastPeakTime;
3535
timestamp_t lastDynamicWorkerStartTime;
3636
bool producerBlocked;
37+
bool shutdown;
3738
char dbname[MAX_DBNAME_LEN];
3839
char dbuser[MAX_DBUSER_LEN];
3940
char* queue;
@@ -51,4 +52,5 @@ extern size_t BgwPoolGetQueueSize(BgwPool* pool);
5152

5253
extern timestamp_t BgwGetLastPeekTime(BgwPool* pool);
5354

55+
extern void BgwPoolStop(BgwPool* pool);
5456
#endif

contrib/mmts/multimaster.c

+33-19
Original file line numberDiff line numberDiff line change
@@ -583,7 +583,9 @@ MtmAdjustOldestXid(TransactionId xid)
583583

584584
for (ts = Mtm->transListHead;
585585
ts != NULL
586+
&& (ts->status == TRANSACTION_STATUS_ABORTED || ts->status == TRANSACTION_STATUS_COMMITTED)
586587
&& ts->csn < oldestSnapshot
588+
&& !ts->isPinned
587589
&& TransactionIdPrecedes(ts->xid, xid);
588590
prev = ts, ts = ts->next)
589591
{
@@ -653,6 +655,7 @@ static void MtmAddSubtransactions(MtmTransState* ts, TransactionId* subxids, int
653655
sts = (MtmTransState*)hash_search(MtmXid2State, &subxids[i], HASH_ENTER, &found);
654656
Assert(!found);
655657
sts->isActive = false;
658+
sts->isPinned = false;
656659
sts->status = ts->status;
657660
sts->csn = ts->csn;
658661
sts->votingCompleted = true;
@@ -814,6 +817,7 @@ MtmCreateTransState(MtmCurrentTrans* x)
814817
ts->isLocal = true;
815818
ts->isPrepared = false;
816819
ts->isTwoPhase = x->isTwoPhase;
820+
ts->isPinned = false;
817821
ts->votingCompleted = false;
818822
if (!found) {
819823
ts->isEnqueued = false;
@@ -963,8 +967,13 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
963967
{
964968
int result = 0;
965969
int nConfigChanges = Mtm->nConfigChanges;
966-
timestamp_t elapsed, start = MtmGetSystemTime();
967-
timestamp_t deadline = 0;
970+
timestamp_t prepareTime = ts->csn - ts->snapshot;
971+
timestamp_t timeout = Max(prepareTime + MSEC_TO_USEC(MtmMin2PCTimeout), prepareTime*MtmMax2PCRatio/100);
972+
timestamp_t deadline = MtmGetSystemTime() + timeout;
973+
timestamp_t now;
974+
975+
Assert(ts->csn > ts->snapshot);
976+
968977
/* Wait votes from all nodes until: */
969978
while (!MtmVotingCompleted(ts)
970979
&& (ts->isPrepared || nConfigChanges == Mtm->nConfigChanges))
@@ -980,19 +989,16 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
980989
if (result & WL_LATCH_SET) {
981990
ResetLatch(&MyProc->procLatch);
982991
}
983-
elapsed = MtmGetSystemTime() - start;
992+
now = MtmGetSystemTime();
984993
MtmLock(LW_EXCLUSIVE);
985-
if (deadline == 0 && ts->votedMask != 0) {
986-
deadline = Max(MSEC_TO_USEC(MtmMin2PCTimeout), elapsed*MtmMax2PCRatio/100);
987-
} else {
994+
if (now > deadline) {
988995
if (ts->isPrepared) {
989996
/* resend precommit message */
990997
MtmSend2PCMessage(ts, MSG_PRECOMMIT);
991998
} else {
992-
if (elapsed > deadline) {
993-
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(elapsed));
994-
MtmAbortTransaction(ts);
995-
}
999+
elog(WARNING, "Commit of distributed transaction is canceled because of %ld msec timeout expiration", USEC_TO_MSEC(timeout));
1000+
MtmAbortTransaction(ts);
1001+
break;
9961002
}
9971003
}
9981004
}
@@ -1005,7 +1011,7 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
10051011
} else {
10061012
if (Mtm->status != MTM_ONLINE) {
10071013
elog(WARNING, "Commit of distributed transaction is canceled because node is switched to %s mode", MtmNodeStatusMnem[Mtm->status]);
1008-
} else if (nConfigChanges != Mtm->nConfigChanges) {
1014+
} else {
10091015
elog(WARNING, "Commit of distributed transaction is canceled because cluster configuration was changed");
10101016
}
10111017
MtmAbortTransaction(ts);
@@ -1202,6 +1208,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
12021208
ts->status = TRANSACTION_STATUS_ABORTED;
12031209
ts->isLocal = true;
12041210
ts->isPrepared = false;
1211+
ts->isPinned = false;
12051212
ts->snapshot = x->snapshot;
12061213
ts->isTwoPhase = x->isTwoPhase;
12071214
ts->csn = MtmAssignCSN();
@@ -1280,7 +1287,7 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
12801287
}
12811288
}
12821289

1283-
void MtmBroadcastPollMessage(MtmTransState* ts)
1290+
static void MtmBroadcastPollMessage(MtmTransState* ts)
12841291
{
12851292
int i;
12861293
MtmArbiterMessage msg;
@@ -1293,7 +1300,7 @@ void MtmBroadcastPollMessage(MtmTransState* ts)
12931300

12941301
for (i = 0; i < Mtm->nAllNodes; i++)
12951302
{
1296-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask, i))
1303+
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask, i))
12971304
{
12981305
msg.node = i+1;
12991306
MTM_LOG3("Send request for transaction %s to node %d", msg.gid, msg.node);
@@ -1480,15 +1487,17 @@ static void MtmPollStatusOfPreparedTransactions(int disabledNodeId)
14801487
Assert(ts->gid[0]);
14811488
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
14821489
elog(LOG, "Abort transaction %s because its coordinator is disabled and it is not prepared at node %d", ts->gid, MtmNodeId);
1483-
//MtmUnlock();
1490+
ts->isPinned = true;
1491+
MtmUnlock();
14841492
MtmFinishPreparedTransaction(ts, false);
1485-
//MtmLock(LW_EXCLUSIVE);
1493+
MtmLock(LW_EXCLUSIVE);
1494+
ts->isPinned = false;
14861495
} else {
14871496
MTM_LOG1("Poll state of transaction %d (%s)", ts->xid, ts->gid);
14881497
MtmBroadcastPollMessage(ts);
14891498
}
14901499
} else {
1491-
MTM_LOG2("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
1500+
MTM_LOG1("Skip transaction %d (%s) with status %d gtid.node=%d gtid.xid=%d votedMask=%lx",
14921501
ts->xid, ts->gid, ts->status, ts->gtid.node, ts->gtid.xid, ts->votedMask);
14931502
}
14941503
}
@@ -3216,8 +3225,13 @@ bool MtmFilterTransaction(char* record, int size)
32163225
duplicate = true;
32173226
}
32183227

3219-
MTM_LOG2("%s transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3220-
duplicate ? "Ignore" : "Apply", gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3228+
if (duplicate) {
3229+
MTM_LOG1("Ignore transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3230+
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3231+
} else {
3232+
MTM_LOG2("Apply transaction %s from node %d lsn %lx, flags=%x, origin node %d, original lsn=%lx, current lsn=%lx",
3233+
gid, replication_node, end_lsn, flags, origin_node, origin_lsn, restart_lsn);
3234+
}
32213235
return duplicate;
32223236
}
32233237

@@ -3831,7 +3845,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
38313845
} else {
38323846
CommitTransactionCommand();
38333847
if (x->isSuspended) {
3834-
elog(WARNING, "Transaction %s is left in prepared state because coordinator onde is not online", x->gid);
3848+
elog(WARNING, "Transaction %s is left in prepared state because coordinator node is not online", x->gid);
38353849
} else {
38363850
StartTransactionCommand();
38373851
if (x->status == TRANSACTION_STATUS_ABORTED) {

contrib/mmts/multimaster.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,8 @@ typedef struct MtmTransState
223223
bool isEnqueued; /* Transaction is inserted in queue */
224224
bool isPrepared; /* Transaction is prepared: now it is safe to commit transaction */
225225
bool isActive; /* Transaction is active */
226-
bool isTwoPhase; /* user level 2PC */
226+
bool isTwoPhase; /* User level 2PC */
227+
bool isPinned; /* Transaction oid potected from GC */
227228
nodemask_t participantsMask; /* Mask of nodes involved in transaction */
228229
nodemask_t votedMask; /* Mask of voted nodes */
229230
TransactionId xids[1]; /* [Mtm->nAllNodes]: transaction ID at replicas */
@@ -331,7 +332,6 @@ extern void MtmExecutor(void* work, size_t size);
331332
extern void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd);
332333
extern void MtmSendMessage(MtmArbiterMessage* msg);
333334
extern void MtmAdjustSubtransactions(MtmTransState* ts);
334-
extern void MtmBroadcastPollMessage(MtmTransState* ts);
335335
extern void MtmLock(LWLockMode mode);
336336
extern void MtmUnlock(void);
337337
extern void MtmLockNode(int nodeId, LWLockMode mode);

contrib/mmts/tests2/lib/bank_client.py

+19-26
Original file line numberDiff line numberDiff line change
@@ -111,20 +111,24 @@ def status(self):
111111
while self.running:
112112
msg = yield from self.child_pipe.coro_recv()
113113
if msg == 'status':
114-
# print('evloop: got status request')
115-
serialized_aggs = {}
116-
for name, aggregate in self.aggregates.items():
117-
serialized_aggs[name] = aggregate.as_dict()
118-
aggregate.clear_values()
114+
serialized_aggs = []
115+
116+
for conn_id, conn_aggs in self.aggregates.items():
117+
serialized_aggs.append({})
118+
for aggname, agg in conn_aggs.items():
119+
serialized_aggs[conn_id][aggname] = agg.as_dict()
120+
agg.clear_values()
121+
119122
self.child_pipe.send(serialized_aggs)
120-
# print('evloop: sent status response')
121123
else:
122124
print('evloop: unknown message')
123125

124126
@asyncio.coroutine
125127
def exec_tx(self, tx_block, aggname_prefix, conn_i):
126128
aggname = "%s_%i" % (aggname_prefix, conn_i)
127-
agg = self.aggregates[aggname] = MtmTxAggregate(aggname)
129+
if conn_i not in self.aggregates:
130+
self.aggregates[conn_i] = {}
131+
agg = self.aggregates[conn_i][aggname_prefix] = MtmTxAggregate(aggname)
128132
pool = yield from aiopg.create_pool(self.dsns[conn_i])
129133
conn = yield from pool.acquire()
130134
cur = yield from conn.cursor()
@@ -167,8 +171,8 @@ def total_tx(self, conn, cur, agg):
167171
total = yield from cur.fetchone()
168172
if total[0] != 0:
169173
agg.isolation += 1
170-
# print(self.oops)
171-
# print('Isolation error, total = ', total[0])
174+
print(self.oops)
175+
print('Isolation error, total = ', total[0])
172176
# yield from cur.execute('select * from mtm.get_nodes_state()')
173177
# nodes_state = yield from cur.fetchall()
174178
# for i, col in enumerate(self.nodes_state_fields):
@@ -177,7 +181,6 @@ def total_tx(self, conn, cur, agg):
177181
# print("%19s" % nodes_state[j][i], end="\t")
178182
# print("\n")
179183

180-
181184
def run(self):
182185
# asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
183186
self.loop = asyncio.get_event_loop()
@@ -196,13 +199,9 @@ def bgrun(self):
196199
self.evloop_process = multiprocessing.Process(target=self.run, args=())
197200
self.evloop_process.start()
198201

199-
# XXX: introduce periodic report from client?
200202
def get_aggregates(self, print=True):
201-
# print('test: sending status request')
202203
self.parent_pipe.send('status')
203-
# print('test: awaitng status response')
204204
resp = self.parent_pipe.recv()
205-
# print('test: got status response')
206205
if print:
207206
MtmClient.print_aggregates(resp)
208207
return resp
@@ -216,7 +215,7 @@ def stop(self):
216215
self.evloop_process.terminate()
217216

218217
@classmethod
219-
def print_aggregates(cls, serialized_agg):
218+
def print_aggregates(cls, aggs):
220219
columns = ['running_latency', 'max_latency', 'isolation', 'finish']
221220

222221
# print table header
@@ -225,23 +224,17 @@ def print_aggregates(cls, serialized_agg):
225224
print(col, end="\t")
226225
print("\n", end="")
227226

228-
serialized_agg
229-
230-
for aggname in sorted(serialized_agg.keys()):
231-
agg = serialized_agg[aggname]
232-
print("%s\t" % aggname, end="")
233-
for col in columns:
234-
if col in agg:
227+
for conn_id, agg_conn in enumerate(aggs):
228+
for aggname, agg in agg_conn.items():
229+
print("Node %d: %s\t" % (conn_id + 1, aggname), end="")
230+
for col in columns:
235231
if isinstance(agg[col], float):
236232
print("%.2f\t" % (agg[col],), end="\t")
237233
else:
238234
print(agg[col], end="\t")
239-
else:
240-
print("-\t", end="")
241-
print("")
235+
print("")
242236
print("")
243237

244-
245238
if __name__ == "__main__":
246239
c = MtmClient(['dbname=postgres user=postgres host=127.0.0.1',
247240
'dbname=postgres user=postgres host=127.0.0.1 port=5433',

0 commit comments

Comments
 (0)