Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 814af3e

Browse files
knizhnikkelvich
authored andcommitted
Continue work on MMTS
1 parent 5f3f8e2 commit 814af3e

File tree

7 files changed

+77
-51
lines changed

7 files changed

+77
-51
lines changed

arbiter.c

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ static int connectSocket(char const* host, int port)
200200
}
201201
if (rc < 0) {
202202
if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) || max_attempts == 0) {
203-
elog(ERROR, "Sockhub failed to connect to %s:%d: %d", host, port, errno);
203+
elog(ERROR, "Arbiter failed to connect to %s:%d: %d", host, port, errno);
204204
} else {
205205
max_attempts -= 1;
206206
sleep(1);
@@ -236,7 +236,7 @@ static void openConnections()
236236
} else {
237237
connStr = end;
238238
}
239-
sockets[i] = i+1 != MMNodeId ? connectSocket(host, MMArbiterPort + i) : -1;
239+
sockets[i] = i+1 != MMNodeId ? connectSocket(host, MMArbiterPort + i + 1) : -1;
240240
}
241241
}
242242

@@ -260,9 +260,12 @@ static void acceptConnections()
260260
}
261261
setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
262262

263-
if (bind(sd, (struct sockaddr*)&sock_inet, nNodes-1) < 0) {
263+
if (bind(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet)) < 0) {
264264
elog(ERROR, "Failed to bind socket: %d", errno);
265265
}
266+
if (listen(sd, MMNodes-1) < 0) {
267+
elog(ERROR, "Failed to listen socket: %d", errno);
268+
}
266269

267270
for (i = 0; i < nNodes; i++) {
268271
int fd = accept(sd, NULL, NULL);
@@ -332,8 +335,13 @@ static void DtmTransSender(Datum arg)
332335
writeSocket(sockets[i], txBuffer[i].data, txBuffer[i].used*sizeof(DtmCommitMessage));
333336
txBuffer[i].used = 0;
334337
}
338+
DTM_TRACE("Send notification %ld to replica %d from coordinator %d for transaction %d (local transaction %d)\n",
339+
ts->csn, i+1, MMNodeId, ts->xid, ts->xids[i]);
340+
335341
txBuffer[i].data[txBuffer[i].used].dxid = ts->xids[i];
342+
txBuffer[i].data[txBuffer[i].used].sxid = ts->xid;
336343
txBuffer[i].data[txBuffer[i].used].csn = ts->csn;
344+
txBuffer[i].data[txBuffer[i].used].node = MMNodeId;
337345
txBuffer[i].used += 1;
338346
}
339347
}
@@ -344,6 +352,8 @@ static void DtmTransSender(Datum arg)
344352
writeSocket(sockets[i], txBuffer[i].data, txBuffer[i].used*sizeof(DtmCommitMessage));
345353
txBuffer[i].used = 0;
346354
}
355+
DTM_TRACE("Send notification %ld to coordinator %d from node %d for transaction %d (local transaction %d)\n",
356+
ts->csn, ts->gtid.node, MMNodeId, ts->gtid.xid, ts->xid);
347357
txBuffer[i].data[txBuffer[i].used].dxid = ts->gtid.xid;
348358
txBuffer[i].data[txBuffer[i].used].sxid = ts->xid;
349359
txBuffer[i].data[txBuffer[i].used].node = MMNodeId;
@@ -426,6 +436,9 @@ static void DtmTransReceiver(Datum arg)
426436
}
427437
Assert((unsigned)(msg->node-1) <= (unsigned)nNodes);
428438
ts->xids[msg->node-1] = msg->sxid;
439+
DTM_TRACE("Receive response %ld for transaction %d votes %d from node %d (transaction %d)\n",
440+
msg->csn, msg->dxid, ts->nVotes+1, msg->node, msg->sxid);
441+
Assert(ts->nVotes > 0 && ts->nVotes < ds->nNodes);
429442
if (++ts->nVotes == ds->nNodes) {
430443
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
431444
}

multimaster.c

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,14 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
230230
if (ts != NULL)
231231
{
232232
if (ts->csn > dtmTx.snapshot) {
233-
DTM_TRACE((stderr, "%d: tuple with xid=%d(csn=%lld) is invisibile in snapshot %lld\n",
234-
getpid(), xid, ts->csn, dtmTx.snapshot));
233+
DTM_TUPLE_TRACE("%d: tuple with xid=%d(csn=%ld) is invisibile in snapshot %ld\n",
234+
getpid(), xid, ts->csn, dtmTx.snapshot);
235235
LWLockRelease(dtm->hashLock);
236236
return true;
237237
}
238238
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS)
239239
{
240-
DTM_TRACE((stderr, "%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtmTx.snapshot));
240+
DTM_TRACE("%d: wait for in-doubt transaction %u in snapshot %lu\n", getpid(), xid, dtmTx.snapshot);
241241
LWLockRelease(dtm->hashLock);
242242
#if TRACE_SLEEP_TIME
243243
{
@@ -255,7 +255,7 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
255255
if (firstReportTime == 0) {
256256
firstReportTime = now;
257257
} else {
258-
fprintf(stderr, "Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime), maxSleepTime);
258+
DTM_TRACE("Snapshot sleep %lu of %lu usec (%f%%), maximum=%lu\n", totalSleepTime, now - firstReportTime, totalSleepTime*100.0/(now - firstReportTime), maxSleepTime);
259259
}
260260
}
261261
}
@@ -268,15 +268,15 @@ bool DtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
268268
else
269269
{
270270
bool invisible = ts->status != TRANSACTION_STATUS_COMMITTED;
271-
DTM_TRACE((stderr, "%d: tuple with xid=%d(csn= %lld) is %s in snapshot %lld\n",
272-
getpid(), xid, ts->csn, invisible ? "rollbacked" : "committed", dtmTx.snapshot));
271+
DTM_TUPLE_TRACE("%d: tuple with xid=%d(csn= %ld) is %s in snapshot %ld\n",
272+
getpid(), xid, ts->csn, invisible ? "rollbacked" : "committed", dtmTx.snapshot);
273273
LWLockRelease(dtm->hashLock);
274274
return invisible;
275275
}
276276
}
277277
else
278278
{
279-
DTM_TRACE((stderr, "%d: visibility check is skept for transaction %u in snapshot %lu\n", getpid(), xid, dtmTx.snapshot));
279+
DTM_TUPLE_TRACE("%d: visibility check is skept for transaction %u in snapshot %lu\n", getpid(), xid, dtmTx.snapshot);
280280
break;
281281
}
282282
}
@@ -342,14 +342,15 @@ DtmAdjustOldestXid(TransactionId xid)
342342
ts = (DtmTransState*)hash_search(xid2state, &xid, HASH_FIND, NULL);
343343
if (ts != NULL) {
344344
timestamp_t cutoff_time = ts->csn - DtmVacuumDelay*USEC;
345-
345+
#if 0
346346
for (ts = dtm->transListHead; ts != NULL && ts->csn < cutoff_time; prev = ts, ts = ts->next) {
347347
Assert(ts->status == TRANSACTION_STATUS_COMMITTED || ts->status == TRANSACTION_STATUS_ABORTED);
348348
if (prev != NULL) {
349349
/* Remove information about too old transactions */
350350
hash_search(xid2state, &prev->xid, HASH_REMOVE, NULL);
351351
}
352352
}
353+
#endif
353354
}
354355
if (prev != NULL) {
355356
dtm->transListHead = prev;
@@ -398,7 +399,6 @@ static void DtmInitialize()
398399
static void
399400
DtmXactCallback(XactEvent event, void *arg)
400401
{
401-
//XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
402402
switch (event)
403403
{
404404
case XACT_EVENT_START:
@@ -427,7 +427,7 @@ DtmBeginTransaction(DtmCurrentTrans* x)
427427
x->snapshot = dtm_get_csn();
428428
x->gtid.xid = InvalidTransactionId;
429429
LWLockRelease(dtm->hashLock);
430-
DTM_TRACE((stderr, "DtmLocalTransaction: transaction %u uses local snapshot %lu\n", x->xid, x->snapshot));
430+
DTM_TRACE("DtmLocalTransaction: transaction %u uses local snapshot %lu\n", x->xid, x->snapshot);
431431
}
432432
}
433433

@@ -438,6 +438,7 @@ DtmBeginTransaction(DtmCurrentTrans* x)
438438
static void DtmPrepareTransaction(DtmCurrentTrans* x)
439439
{
440440
DtmTransState* ts;
441+
bool found;
441442
int i;
442443

443444
if (!x->isDistributed) {
@@ -448,8 +449,9 @@ static void DtmPrepareTransaction(DtmCurrentTrans* x)
448449
x->xid = GetCurrentTransactionId();
449450
}
450451
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
451-
ts = hash_search(xid2state, &x->xid, HASH_ENTER, NULL);
452-
ts->snapshot = x->isReplicated ? x->snapshot : INVALID_CSN;
452+
ts = hash_search(xid2state, &x->xid, HASH_ENTER, &found);
453+
Assert(!found);
454+
ts->snapshot = x->isReplicated ? INVALID_CSN : x->snapshot;
453455
ts->status = TRANSACTION_STATUS_UNKNOWN;
454456
ts->csn = dtm_get_csn();
455457
ts->procno = MyProc->pgprocno;
@@ -475,6 +477,24 @@ DtmEndTransaction(DtmCurrentTrans* x)
475477
x->gtid.xid = InvalidTransactionId;
476478
}
477479

480+
static void
481+
SendNotificationMessage(DtmTransState* ts)
482+
{
483+
DtmTransState* votingList;
484+
485+
SpinLockAcquire(&dtm->votingSpinlock);
486+
votingList = dtm->votingTransactions;
487+
ts->nextVoting = votingList;
488+
dtm->votingTransactions = ts;
489+
SpinLockRelease(&dtm->votingSpinlock);
490+
DTM_TRACE("Register commit message\n");
491+
if (votingList == NULL) {
492+
/* singal semaphore only once for the whole list */
493+
DTM_TRACE("Signal semaphore\n");
494+
PGSemaphoreUnlock(&dtm->votingSemaphore);
495+
}
496+
}
497+
478498
static XidStatus
479499
DtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
480500
{
@@ -524,12 +544,16 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
524544

525545
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
526546
ts = hash_search(xid2state, &xid, HASH_FIND, NULL);
527-
Assert(ts != NULL); /* should be created by DtmPrepareTransaction */
528-
ts->status = status;
529-
for (i = 0; i < nsubxids; i++) {
530-
ts = ts->next;
547+
if (ts != NULL) { /* should be created by DtmPrepareTransaction */
531548
ts->status = status;
532-
}
549+
for (i = 0; i < nsubxids; i++) {
550+
ts = ts->next;
551+
ts->status = status;
552+
}
553+
if (dtmTx.isReplicated) {
554+
SendNotificationMessage(ts);
555+
}
556+
}
533557
LWLockRelease(dtm->hashLock);
534558
}
535559

@@ -538,19 +562,18 @@ DtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
538562
static void
539563
DtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
540564
{
541-
DTM_INFO("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
542-
if (dtmTx.isDistributed)
565+
DTM_TRACE("%d: DtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
566+
if (xid == dtmTx.xid && dtmTx.isDistributed)
543567
{
544-
Assert(xid == dtmTx.xid);
545568
if (status == TRANSACTION_STATUS_ABORTED || !dtmTx.containsDML)
546569
{
547570
DtmFinishTransaction(xid, nsubxids, subxids, status);
548-
DTM_INFO("Abort transaction %d\n", xid);
571+
DTM_TRACE("Abort transaction %d\n", xid);
549572
}
550573
else
551574
{
552575
if (DtmCommitTransaction(xid, nsubxids, subxids) == TRANSACTION_STATUS_COMMITTED) {
553-
DTM_INFO("Commit transaction %d\n", xid);
576+
DTM_TRACE("Commit transaction %d\n", xid);
554577
} else {
555578
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, TRANSACTION_STATUS_ABORTED, lsn);
556579
dtmTx.isDistributed = false;
@@ -643,7 +666,7 @@ _PG_init(void)
643666
);
644667

645668
DefineCustomIntVariable(
646-
"multimaster.arpiter_port",
669+
"multimaster.arbiter_port",
647670
"Base value for assigning arbiter ports",
648671
NULL,
649672
&MMArbiterPort,
@@ -990,42 +1013,29 @@ MMPoolConstructor(void)
9901013
return &dtm->pool;
9911014
}
9921015

993-
static void
994-
SendCommitMessage(DtmTransState* ts)
995-
{
996-
DtmTransState* votingList;
997-
998-
SpinLockAcquire(&dtm->votingSpinlock);
999-
votingList = dtm->votingTransactions;
1000-
ts->nextVoting = votingList;
1001-
dtm->votingTransactions = ts;
1002-
SpinLockRelease(&dtm->votingSpinlock);
1003-
1004-
if (votingList == NULL) {
1005-
/* singal semaphreo only once for the whole list */
1006-
PGSemaphoreUnlock(&dtm->votingSemaphore);
1007-
}
1008-
}
1009-
10101016
static void
10111017
MMVoteForTransaction(DtmTransState* ts)
10121018
{
10131019
LWLockRelease(dtm->hashLock);
10141020
if (ts->gtid.node == MMNodeId) {
10151021
/* I am coordinator: wait responses from all replicas for transaction replicated using logical decoding */
1022+
DTM_TRACE("Coordinator waiting latch...\n");
10161023
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
10171024
ResetLatch(&MyProc->procLatch);
1025+
DTM_TRACE("Coordinator receive %d votes\n", ts->nVotes);
10181026
Assert(ts->nVotes == dtm->nNodes);
10191027

10201028
/* ... and then send notifications to replicas */
1021-
SendCommitMessage(ts);
1029+
SendNotificationMessage(ts);
10221030
} else {
10231031
/* I am replica: first notify coordinator... */
10241032
ts->nVotes = dtm->nNodes-1; /* I just need one confirmation from coordinator */
1025-
SendCommitMessage(ts);
1033+
SendNotificationMessage(ts);
10261034
/* ... and wait response from it */
1035+
DTM_TRACE("Node %d waiting latch...\n", MMNodeId);
10271036
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
10281037
ResetLatch(&MyProc->procLatch);
1038+
DTM_TRACE("Node %d receive response...\n", MMNodeId);
10291039
}
10301040
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
10311041
}
@@ -1034,6 +1044,7 @@ HTAB* MMCreateHash(void)
10341044
{
10351045
HASHCTL info;
10361046
HTAB* htab;
1047+
Assert(MMNodes > 0);
10371048
memset(&info, 0, sizeof(info));
10381049
info.keysize = sizeof(TransactionId);
10391050
info.entrysize = sizeof(DtmTransState) + (MMNodes-1)*sizeof(TransactionId);

multimaster.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
#include "bytebuf.h"
55
#include "bgwpool.h"
66

7-
#define DTM_TRACE(fmt, ...)
8-
/* #define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__) */
9-
#define DTM_INFO(fmt, ...)
7+
#define DTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8+
//#define DTM_TRACE(fmt, ...)
9+
#define DTM_TUPLE_TRACE(fmt, ...)
1010

1111
#define BIT_SET(mask, bit) ((mask) & ((int64)1 << (bit)))
1212

pglogical_apply.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,10 @@ process_remote_begin(StringInfo s)
330330
gtid.node = pq_getmsgint(s, 4);
331331
gtid.xid = pq_getmsgint(s, 4);
332332
snapshot = pq_getmsgint64(s);
333-
MMJoinTransaction(&gtid, snapshot);
334333
SetCurrentStatementStartTimestamp();
335334
StartTransactionCommand();
335+
MMJoinTransaction(&gtid, snapshot);
336+
fprintf(stderr, "REMOTE begin node=%d xid=%d snapshot=%ld\n", gtid.node, gtid.xid, snapshot);
336337
}
337338

338339
static void

pglogical_proto.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
105105
{
106106
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
107107
csn_t csn = MMTransactionSnapshot(txn->xid);
108+
fprintf(stderr, "pglogical_write_begin %d CSN=%ld\n", txn->xid, csn);
108109
if (csn == INVALID_CSN) {
109110
mm->isLocal = true;
110111
} else {

tests/dtmbench

-26.3 KB
Binary file not shown.

tests/dtmbench.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ void* writer(void* arg)
179179
void initializeDatabase()
180180
{
181181
connection conn(cfg.connections[0]);
182-
182+
#if 0
183183
printf("creating extension\n");
184184
{
185185
nontransaction txn(conn);
@@ -197,7 +197,7 @@ void initializeDatabase()
197197
txn.commit();
198198
}
199199
printf("table t created\n");
200-
200+
#endif
201201
printf("inserting stuff into t\n");
202202
{
203203
work txn(conn);

0 commit comments

Comments
 (0)