Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit cd6b6d2

Browse files
knizhnikkelvich
authored andcommitted
Update MMTS
1 parent bf78f0b commit cd6b6d2

File tree

3 files changed

+35
-8
lines changed

3 files changed

+35
-8
lines changed

arbiter.c

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ typedef struct
6161

6262
typedef struct
6363
{
64-
DtmCOmmitMessage buf[TX_BUFFER_SIZE];
64+
DtmCommitMessage data[TX_BUFFER_SIZE];
6565
int used;
6666
} DtmTxBuffer;
6767

6868
static int* sockets;
69-
static DtmCommitMessage** txBuffers;
69+
static DtmTxBuffer* txBuffers;
7070

7171
static BackgroundWorker DtmSender = {
7272
"mm-sender",
@@ -216,18 +216,40 @@ static void acceptConnections()
216216

217217
static void DtmTransSender(Datum arg)
218218
{
219-
txBuffer = (DtmCommitMessage*)
219+
int nNodes = dtm->nNodes;
220+
int i;
221+
DtmCommitMessage* txBuffer = (DtmCommitMessage*)palloc(sizeof(DtmTxBuffer)*(nNodes));
222+
220223
openConnections();
221224

225+
for (i = 0; i < nNodes; i++) {
226+
txBuffer[i].used = 0;
227+
}
228+
222229
while (true) {
223-
DtmTransState* ts;
230+
DtmTransState* ts;
224231
PGSemaphoreLock(&dtm->semphore);
225232

226-
LWLockAcquire(&dtm->hashLock, LW_EXCLUSIVE);
233+
SpinLockAcquire(&dtm->spinlock);
234+
ts = dtm->pendingTransactions;
235+
dtm->pendingTransactions = NULL;
236+
SpinLockRelease(&dtm->spinlock);
237+
227238
for (ts = dtm->pendingTransactions; ts != NULL; ts = ts->nextPending) {
228239
int node = ts->gtid.node;
229240
Assert(node != MMNodeId);
230-
sockets
241+
node -= 1;
242+
if (txBuffer[node].used == TX_BUFFER_SIZE) {
243+
WriteSocket(sockets[node], txBuffer[node].data, txBuffer[node].used*sizeof(DtmCommitRequest));
244+
txBuffer[node].used = 0;
245+
}
246+
txBuffer[node].data[txBuffer[node].used].xid = ts->xid;
247+
txBuffer[node].data[txBuffer[node].used].csn = ts->csn;
248+
txBuffer[node].used += 1;
249+
}
250+
dtm->pendingTransactions = NULL;
251+
252+
}
231253
}
232254

233255
static void DtmTransReceiver(Datum arg)

multimaster.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -983,14 +983,17 @@ void MMVoteForTransaction(DtmTransState* ts)
983983
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
984984
ResetLatch(&MyProc->procLatch);
985985
}
986-
LWLockAcquire(&dtm->hashLock< LW_EXCLUSIVE);
987986
} else {
988-
/* I am replica: firrst notify master... */
987+
/* I am replica: first notify master... */
988+
SpinLockAcquire(&dtm->spinlock);
989989
ts->nextPending = dtm->pendingTransactions;
990990
dtm->pendingTransactions = ts;
991+
SpinLockRelease(&dtm->spinlock);
992+
991993
PGSemaphoreUnlock(&dtm->semapahore);
992994
/* ... and wait reposnse from it */
993995
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
994996
ResetLatch(&MyProc->procLatch);
995997
}
998+
LWLockAcquire(&dtm->hashLock< LW_EXCLUSIVE);
996999
}

multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ typedef struct DtmTransState
3232

3333
typedef struct
3434
{
35+
volatile slock_t spinlock;
36+
PGSemaphoreData semaphore;
3537
LWLockId hashLock;
3638
TransactionId minXid; /* XID of oldest transaction visible by any active transaction (local or global) */
3739
int64 disabledNodeMask;

0 commit comments

Comments
 (0)