Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 771d3b8

Browse files
committed
move all lwlocks to shared structs -- windows compat
1 parent 3d9f462 commit 771d3b8

11 files changed

+105
-94
lines changed

src/commit.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ MtmTwoPhaseCommit()
247247
while (Mtm->stop_new_commits)
248248
MtmSleep(USECS_PER_SEC);
249249

250-
LWLockAcquire(MtmCommitBarrier, LW_SHARED);
250+
LWLockAcquire(Mtm->commit_barrier, LW_SHARED);
251251

252252
participants = MtmGetEnabledNodeMask() &
253253
~((nodemask_t)1 << (mtm_cfg->my_node_id-1));
@@ -291,7 +291,7 @@ MtmTwoPhaseCommit()
291291
// XXX: make this conditional
292292
gather(participants, messages, &n_messages);
293293

294-
LWLockRelease(MtmCommitBarrier);
294+
LWLockRelease(Mtm->commit_barrier);
295295

296296
dmq_stream_unsubscribe(gid);
297297
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, gid);

src/ddl.c

+21-9
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@
5959
#define Anum_mtm_local_tables_rel_schema 1
6060
#define Anum_mtm_local_tables_rel_name 2
6161

62+
struct DDLSharedState
63+
{
64+
LWLock *localtab_lock;
65+
} *ddl_shared;
6266

6367
typedef struct MtmGucEntry
6468
{
@@ -86,7 +90,6 @@ static bool DDLApplyInProgress;
8690
static HTAB *MtmGucHash = NULL;
8791
static dlist_head MtmGucList = DLIST_STATIC_INIT(MtmGucList);
8892

89-
static LWLock *MtmLocalTablesMapLock;
9093
static HTAB *MtmRemoteFunctions;
9194
static HTAB *MtmLocalTables;
9295

@@ -137,6 +140,7 @@ MtmDDLReplicationInit()
137140
{
138141
Size size = 0;
139142

143+
size = add_size(size, sizeof(struct DDLSharedState));
140144
size = add_size(size, hash_estimate_size(MULTIMASTER_MAX_LOCAL_TABLES,
141145
sizeof(Oid)));
142146
size = MAXALIGN(size);
@@ -161,14 +165,22 @@ MtmDDLReplicationInit()
161165
void
162166
MtmDDLReplicationShmemStartup(void)
163167
{
164-
HASHCTL info;
168+
HASHCTL info;
169+
bool found;
165170

166171
memset(&info, 0, sizeof(info));
167172
info.entrysize = info.keysize = sizeof(Oid);
168173

169174
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
170175

171-
MtmLocalTablesMapLock = &(GetNamedLWLockTranche("mtm-ddl"))->lock;
176+
ddl_shared = ShmemInitStruct("ddl",
177+
sizeof(struct DDLSharedState),
178+
&found);
179+
180+
if (!found)
181+
{
182+
ddl_shared->localtab_lock = &(GetNamedLWLockTranche("mtm-ddl"))->lock;
183+
}
172184

173185
MtmLocalTables = ShmemInitHash("MtmLocalTables",
174186
MULTIMASTER_MAX_LOCAL_TABLES, MULTIMASTER_MAX_LOCAL_TABLES,
@@ -1237,9 +1249,9 @@ MtmMakeRelationLocal(Oid relid)
12371249
{
12381250
if (OidIsValid(relid))
12391251
{
1240-
LWLockAcquire(MtmLocalTablesMapLock, LW_EXCLUSIVE);
1252+
LWLockAcquire(ddl_shared->localtab_lock, LW_EXCLUSIVE);
12411253
hash_search(MtmLocalTables, &relid, HASH_ENTER, NULL);
1242-
LWLockRelease(MtmLocalTablesMapLock);
1254+
LWLockRelease(ddl_shared->localtab_lock);
12431255
}
12441256
}
12451257

@@ -1282,11 +1294,11 @@ MtmIsRelationLocal(Relation rel)
12821294
{
12831295
bool found;
12841296

1285-
LWLockAcquire(MtmLocalTablesMapLock, LW_SHARED);
1297+
LWLockAcquire(ddl_shared->localtab_lock, LW_SHARED);
12861298
if (!Mtm->localTablesHashLoaded)
12871299
{
1288-
LWLockRelease(MtmLocalTablesMapLock);
1289-
LWLockAcquire(MtmLocalTablesMapLock, LW_EXCLUSIVE);
1300+
LWLockRelease(ddl_shared->localtab_lock);
1301+
LWLockAcquire(ddl_shared->localtab_lock, LW_EXCLUSIVE);
12901302
if (!Mtm->localTablesHashLoaded)
12911303
{
12921304
MtmLoadLocalTables();
@@ -1295,7 +1307,7 @@ MtmIsRelationLocal(Relation rel)
12951307
}
12961308

12971309
hash_search(MtmLocalTables, &RelationGetRelid(rel), HASH_FIND, &found);
1298-
LWLockRelease(MtmLocalTablesMapLock);
1310+
LWLockRelease(ddl_shared->localtab_lock);
12991311

13001312
return found;
13011313
}

src/include/multimaster.h

+5-2
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,11 @@ extern bool receiver_mtm_cfg_valid;
153153

154154
typedef struct
155155
{
156+
LWLock *lock;
157+
LWLock *commit_barrier;
158+
LWLock *receiver_barrier;
159+
LWLock *syncpoint_lock;
160+
156161
int my_node_id;
157162
bool stop_new_commits;
158163
XLogRecPtr latestSyncpoint;
@@ -176,8 +181,6 @@ extern MtmCurrentTrans MtmTx;
176181
extern MemoryContext MtmApplyContext;
177182

178183
/* Locks */
179-
extern LWLock *MtmLock;
180-
extern LWLock *MtmCommitBarrier;
181184
extern LWLock *MtmReceiverBarrier;
182185
extern LWLock *MtmSyncpointLock;
183186

src/multimaster.c

+7-12
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,6 @@ static TransactionManager MtmTM =
123123
MtmResumeTransaction
124124
};
125125

126-
LWLock *MtmLock;
127-
LWLock *MtmCommitBarrier;
128-
LWLock *MtmReceiverBarrier;
129-
LWLock *MtmSyncpointLock;
130-
131126
// XXX
132127
bool MtmBackgroundWorker;
133128
int MtmReplicationNodeId;
@@ -278,6 +273,11 @@ MtmSharedShmemStartup()
278273
Mtm->localTablesHashLoaded = false;
279274
Mtm->latestSyncpoint = InvalidXLogRecPtr;
280275

276+
Mtm->lock = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[0].lock);
277+
Mtm->commit_barrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[1].lock);
278+
Mtm->receiver_barrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[2].lock);
279+
Mtm->syncpoint_lock = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[3].lock);
280+
281281
for (i = 0; i < MtmMaxNodes; i++)
282282
{
283283
Mtm->peers[i].receiver_pid = InvalidPid;
@@ -291,11 +291,6 @@ MtmSharedShmemStartup()
291291

292292
RegisterXactCallback(MtmXactCallback, NULL);
293293

294-
MtmLock = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[0].lock);
295-
MtmCommitBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[1].lock);
296-
MtmReceiverBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[2].lock);
297-
MtmSyncpointLock = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[3].lock);
298-
299294
TM = &MtmTM;
300295
LWLockRelease(AddinShmemInitLock);
301296
}
@@ -984,9 +979,9 @@ mtm_join_node(PG_FUNCTION_ARGS)
984979
* Create syncpoints for all our peers so that new node can safely start
985980
* recovered replication connections.
986981
*/
987-
LWLockAcquire(MtmReceiverBarrier, LW_EXCLUSIVE);
982+
LWLockAcquire(Mtm->receiver_barrier, LW_EXCLUSIVE);
988983
curr_lsn = GetXLogWriteRecPtr();
989-
LWLockRelease(MtmReceiverBarrier);
984+
LWLockRelease(Mtm->receiver_barrier);
990985

991986
/* as we going to write that lsn on a new node, let's sync it */
992987
XLogFlush(curr_lsn);

src/pglogical_apply.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -855,9 +855,9 @@ mtm_send_xid_reply(TransactionId xid, int node_id, MtmMessageCode msg_code)
855855
DmqDestinationId dest_id;
856856
StringInfoData msg;
857857

858-
LWLockAcquire(MtmLock, LW_SHARED);
858+
LWLockAcquire(Mtm->lock, LW_SHARED);
859859
dest_id = Mtm->peers[node_id - 1].dmq_dest_id;
860-
LWLockRelease(MtmLock);
860+
LWLockRelease(Mtm->lock);
861861

862862
Assert(dest_id >= 0);
863863

@@ -882,9 +882,9 @@ mtm_send_gid_reply(char *gid, int node_id, MtmMessageCode msg_code)
882882
DmqDestinationId dest_id;
883883
StringInfoData msg;
884884

885-
LWLockAcquire(MtmLock, LW_SHARED);
885+
LWLockAcquire(Mtm->lock, LW_SHARED);
886886
dest_id = Mtm->peers[node_id - 1].dmq_dest_id;
887-
LWLockRelease(MtmLock);
887+
LWLockRelease(Mtm->lock);
888888

889889
Assert(dest_id >= 0);
890890

src/pglogical_output.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,7 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
477477
*/
478478
if (!hooks_data->recovery_done)
479479
{
480-
LWLockAcquire(MtmReceiverBarrier, LW_EXCLUSIVE);
480+
LWLockAcquire(Mtm->receiver_barrier, LW_EXCLUSIVE);
481481
hooks_data->recovery_done = true;
482482
mtm_log(ProtoTraceState, "Start building safe point to finish recovery");
483483
}
@@ -488,7 +488,7 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
488488
data->api->write_caughtup(ctx->out, data, ctx->reader->EndRecPtr);
489489
MtmOutputPluginWrite(ctx, true, true);
490490

491-
LWLockRelease(MtmReceiverBarrier);
491+
LWLockRelease(Mtm->receiver_barrier);
492492

493493
/*
494494
* This hook can be called mupltiple times when there is concurrent

src/pglogical_proto.c

+6-6
Original file line numberDiff line numberDiff line change
@@ -776,9 +776,9 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
776776
* to setup all stuff in shared memory. But seems that there is no such
777777
* callback in vanilla pg and adding one will require some carefull thoughts.
778778
*/
779-
LWLockAcquire(MtmLock, LW_EXCLUSIVE);
779+
LWLockAcquire(Mtm->lock, LW_EXCLUSIVE);
780780
Mtm->peers[MtmReplicationNodeId - 1].sender_pid = MyProcPid;
781-
LWLockRelease(MtmLock);
781+
LWLockRelease(Mtm->lock);
782782

783783
if (hooks_data->is_recovery)
784784
{
@@ -815,10 +815,10 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
815815
char *session_id = psprintf(INT64_FORMAT, hooks_data->session_id);
816816

817817
Mtm->stop_new_commits = true;
818-
LWLockAcquire(MtmCommitBarrier, LW_EXCLUSIVE);
818+
LWLockAcquire(Mtm->commit_barrier, LW_EXCLUSIVE);
819819
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERED, false);
820820
msg_xptr = LogLogicalMessage("P", session_id, strlen(session_id) + 1, false);
821-
LWLockRelease(MtmCommitBarrier);
821+
LWLockRelease(Mtm->commit_barrier);
822822
Mtm->stop_new_commits = false;
823823

824824
XLogFlush(msg_xptr);
@@ -837,9 +837,9 @@ MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
837837
{
838838
Assert(MtmReplicationNodeId >= 0);
839839

840-
LWLockAcquire(MtmLock, LW_EXCLUSIVE);
840+
LWLockAcquire(Mtm->lock, LW_EXCLUSIVE);
841841
Mtm->peers[MtmReplicationNodeId - 1].sender_pid = InvalidPid;
842-
LWLockRelease(MtmLock);
842+
LWLockRelease(Mtm->lock);
843843

844844
MtmStateProcessNeighborEvent(MtmReplicationNodeId,
845845
MTM_NEIGHBOR_WAL_SENDER_STOP, false);

src/pglogical_receiver.c

+7-7
Original file line numberDiff line numberDiff line change
@@ -289,16 +289,16 @@ MtmExecute(void* work, int size, MtmReceiverContext *receiver_ctx, bool no_pool)
289289
/* parallel_allowed should never be set during recovery */
290290
Assert( !(receiver_ctx->is_recovery && receiver_ctx->parallel_allowed) );
291291

292-
LWLockAcquire(MtmReceiverBarrier, LW_SHARED);
292+
LWLockAcquire(Mtm->receiver_barrier, LW_SHARED);
293293

294294
if (receiver_ctx->is_recovery || !receiver_ctx->parallel_allowed || no_pool)
295295
MtmExecutor(work, size, receiver_ctx);
296296
else
297297
BgwPoolExecute(&Mtm->pools[MtmReplicationNodeId-1], work, size, receiver_ctx);
298298

299299
/* Our error handler can release lock */
300-
if (LWLockHeldByMe(MtmReceiverBarrier))
301-
LWLockRelease(MtmReceiverBarrier);
300+
if (LWLockHeldByMe(Mtm->receiver_barrier))
301+
LWLockRelease(Mtm->receiver_barrier);
302302
}
303303

304304
/*
@@ -489,9 +489,9 @@ pglogical_receiver_at_exit(int status, Datum arg)
489489
{
490490
int node_id = DatumGetInt32(arg);
491491
BgwPoolCancel(&Mtm->pools[node_id - 1]);
492-
LWLockAcquire(MtmLock, LW_EXCLUSIVE);
492+
LWLockAcquire(Mtm->lock, LW_EXCLUSIVE);
493493
Mtm->peers[node_id - 1].receiver_pid = InvalidPid;
494-
LWLockRelease(MtmLock);
494+
LWLockRelease(Mtm->lock);
495495
}
496496

497497
void
@@ -581,10 +581,10 @@ pglogical_receiver_main(Datum main_arg)
581581
nodeId),
582582
true);
583583

584-
LWLockAcquire(MtmLock, LW_EXCLUSIVE);
584+
LWLockAcquire(Mtm->lock, LW_EXCLUSIVE);
585585
Mtm->peers[nodeId - 1].receiver_pid = MyProcPid;
586586
Mtm->peers[nodeId - 1].receiver_mode = mode;
587-
LWLockRelease(MtmLock);
587+
LWLockRelease(Mtm->lock);
588588

589589
// XXX: delete unnecessary modes
590590
Assert(mode == REPLMODE_RECOVERY || mode == REPLMODE_RECOVERED);

src/resolver.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,9 @@ scatter_status_requests(MtmConfig *mtm_cfg)
442442
StringInfoData msg;
443443
DmqDestinationId dest_id;
444444

445-
LWLockAcquire(MtmLock, LW_SHARED);
445+
LWLockAcquire(Mtm->lock, LW_SHARED);
446446
dest_id = Mtm->peers[node_id - 1].dmq_dest_id;
447-
LWLockRelease(MtmLock);
447+
LWLockRelease(Mtm->lock);
448448
Assert(dest_id >= 0);
449449

450450
initStringInfo(&msg);

0 commit comments

Comments
 (0)