Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5aafae3

Browse files
knizhnikkelvich
authored andcommitted
origin patch
1 parent b3f24e7 commit 5aafae3

File tree

3 files changed

+37
-24
lines changed

3 files changed

+37
-24
lines changed

multimaster.c

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ typedef struct {
8080

8181
typedef enum
8282
{
83-
MTM_STATE_LOCK_ID,
84-
N_LOCKS
83+
MTM_STATE_LOCK_ID
8584
} MtmLockIds;
8685

8786
#define MTM_SHMEM_SIZE (64*1024*1024)
@@ -208,6 +207,17 @@ void MtmUnlock(void)
208207
#endif
209208
}
210209

210+
void MtmLockNode(int nodeId)
211+
{
212+
Assert(nodeId > 0 && nodeId <= MtmNodes);
213+
LWLockAcquire((LWLockId)&dtm->locks[nodeId], LW_EXCLUSIVE);
214+
}
215+
216+
void MtmUnlockNode(int nodeId)
217+
{
218+
Assert(nodeId > 0 && nodeId <= MtmNodes);
219+
LWLockRelease((LWLockId)&dtm->locks[nodeId]);
220+
}
211221

212222
/*
213223
* -------------------------------------------
@@ -1370,15 +1380,6 @@ _PG_init(void)
13701380
NULL
13711381
);
13721382

1373-
1374-
/*
1375-
* Request additional shared resources. (These are no-ops if we're not in
1376-
* the postmaster process.) We'll allocate or attach to the shared
1377-
* resources in mtm_shmem_startup().
1378-
*/
1379-
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmQueueSize);
1380-
RequestNamedLWLockTranche(MULTIMASTER_NAME, N_LOCKS);
1381-
13821383
MtmNodes = MtmStartReceivers(MtmConnStrs, MtmNodeId);
13831384
if (MtmNodes < 2) {
13841385
elog(ERROR, "Multimaster should have at least two nodes");
@@ -1387,6 +1388,14 @@ _PG_init(void)
13871388
elog(ERROR, "Multimaster with mor than %d nodes is not currently supported", MAX_NODES);
13881389
}
13891390

1391+
/*
1392+
* Request additional shared resources. (These are no-ops if we're not in
1393+
* the postmaster process.) We'll allocate or attach to the shared
1394+
* resources in mtm_shmem_startup().
1395+
*/
1396+
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmQueueSize);
1397+
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmNodes);
1398+
13901399
BgwPoolStart(MtmWorkers, MtmPoolConstructor);
13911400

13921401
MtmArbiterInitialize();

multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,8 @@ extern void MtmSendNotificationMessage(MtmTransState* ts, MtmMessageCode cmd);
154154
extern void MtmAdjustSubtransactions(MtmTransState* ts);
155155
extern void MtmLock(LWLockMode mode);
156156
extern void MtmUnlock(void);
157+
extern void MtmLockNode(int nodeId);
158+
extern void MtmUnlockNode(int nodeId);
157159
extern void MtmDropNode(int nodeId, bool dropSlot);
158160
extern void MtmRecoverNode(int nodeId);
159161
extern void MtmOnNodeDisconnect(int nodeId);

pglogical_apply.c

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ static void process_remote_insert(StringInfo s, Relation rel);
7373
static void process_remote_update(StringInfo s, Relation rel);
7474
static void process_remote_delete(StringInfo s, Relation rel);
7575

76+
static int MtmReplicationNode;
77+
7678
/*
7779
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
7880
*
@@ -465,41 +467,40 @@ read_rel(StringInfo s, LOCKMODE mode)
465467
}
466468

467469
static void
468-
MtmBeginSession(int nodeId)
470+
MtmBeginSession(void)
469471
{
470-
#if 0
471472
char slot_name[MULTIMASTER_MAX_SLOT_NAME_SIZE];
472-
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, nodeId);
473+
MtmLockNode(MtmReplicationNode);
474+
sprintf(slot_name, MULTIMASTER_SLOT_PATTERN, MtmReplicationNode);
473475
Assert(replorigin_session_origin == InvalidRepOriginId);
474476
replorigin_session_origin = replorigin_by_name(slot_name, false);
475-
MTM_INFO("%d: Begin setup replorigin session: %d\n", MyProcPid, replorigin_session_origin);
477+
MTM_TRACE("%d: Begin setup replorigin session: %d\n", MyProcPid, replorigin_session_origin);
476478
replorigin_session_setup(replorigin_session_origin);
477-
MTM_INFO("%d: End setup replorigin session: %d\n", MyProcPid, replorigin_session_origin);
478-
#endif
479+
MTM_TRACE("%d: End setup replorigin session: %d\n", MyProcPid, replorigin_session_origin);
479480
}
480481

481482
static void
482483
MtmEndSession(void)
483484
{
484485
if (replorigin_session_origin != InvalidRepOriginId) {
485-
MTM_INFO("%d: Begin reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
486+
MTM_TRACE("%d: Begin reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
486487
replorigin_session_origin = InvalidRepOriginId;
487488
replorigin_session_reset();
488-
MTM_INFO("%d: End reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
489+
MtmUnlockNode(MtmReplicationNode);
490+
MTM_TRACE("%d: End reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
489491
}
490492
}
491493

492494
static void
493495
process_remote_commit(StringInfo in)
494496
{
495497
uint8 flags;
496-
uint8 nodeId;
497498
csn_t csn;
498499
const char *gid = NULL;
499500

500501
/* read flags */
501502
flags = pq_getmsgbyte(in);
502-
nodeId = pq_getmsgbyte(in);
503+
MtmReplicationNode = pq_getmsgbyte(in);
503504

504505
/* read fields */
505506
replorigin_session_origin_lsn = pq_getmsgint64(in); /* commit_lsn */
@@ -515,7 +516,7 @@ process_remote_commit(StringInfo in)
515516
MTM_TRACE("%d: PGLOGICAL_COMMIT commit\n", MyProcPid);
516517
if (IsTransactionState()) {
517518
Assert(TransactionIdIsValid(MtmGetCurrentTransactionId()));
518-
MtmBeginSession(nodeId);
519+
MtmBeginSession();
519520
CommitTransactionCommand();
520521
}
521522
break;
@@ -529,7 +530,8 @@ process_remote_commit(StringInfo in)
529530
BeginTransactionBlock();
530531
CommitTransactionCommand();
531532
StartTransactionCommand();
532-
MtmBeginSession(nodeId);
533+
534+
MtmBeginSession();
533535
/* PREPARE itself */
534536
MtmSetCurrentTransactionGID(gid);
535537
PrepareTransactionBlock(gid);
@@ -543,7 +545,7 @@ process_remote_commit(StringInfo in)
543545
gid = pq_getmsgstring(in);
544546
MTM_TRACE("%d: PGLOGICAL_COMMIT_PREPARED commit: csn=%ld, gid=%s\n", MyProcPid, csn, gid);
545547
StartTransactionCommand();
546-
MtmBeginSession(nodeId);
548+
MtmBeginSession();
547549
MtmSetCurrentTransactionCSN(csn);
548550
MtmSetCurrentTransactionGID(gid);
549551
FinishPreparedTransaction(gid, true);

0 commit comments

Comments
 (0)