Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 9b94f6c

Browse files
knizhnikkelvich
authored andcommitted
Unlock multimaster lock in atexit handler and FINALLy blocks
1 parent 5b7b997 commit 9b94f6c

File tree

4 files changed

+52
-18
lines changed

4 files changed

+52
-18
lines changed

arbiter.c

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
406406
char portstr[MAXPGPATH];
407407
MtmHandshakeMessage req;
408408
MtmArbiterMessage resp;
409-
int sd;
409+
int sd = -1;
410410
int ret;
411411
timestamp_t start = MtmGetSystemTime();
412412
char const* host = Mtm->nodes[node].con.hostName;
@@ -424,7 +424,7 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
424424
ret = pg_getaddrinfo_all(host, portstr, &hint, &addrs);
425425
if (ret != 0)
426426
{
427-
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: %s", host, gai_strerror(ret));
427+
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: (%d) %s", host, ret, gai_strerror(ret));
428428
return -1;
429429
}
430430
BIT_SET(busy_mask, node);
@@ -435,15 +435,12 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
435435
sd = socket(AF_INET, SOCK_STREAM, 0);
436436
if (sd < 0) {
437437
MTM_ELOG(LOG, "Arbiter failed to create socket: %d", errno);
438-
busy_mask = save_mask;
439-
return -1;
438+
goto Error;
440439
}
441440
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
442441
if (rc < 0) {
443442
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
444-
close(sd);
445-
busy_mask = save_mask;
446-
return -1;
443+
goto Error;
447444
}
448445
for (addr = addrs; addr != NULL; addr = addr->ai_next)
449446
{
@@ -461,18 +458,14 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
461458
beforeWait = MtmGetSystemTime();
462459
if (errno != EINPROGRESS || start + MSEC_TO_USEC(timeout) < beforeWait ) {
463460
MTM_ELOG(WARNING, "Arbiter failed to connect to %s:%d: error=%d", host, port, errno);
464-
close(sd);
465-
busy_mask = save_mask;
466-
return -1;
461+
goto Error;
467462
} else {
468463
rc = MtmWaitSocket(sd, true, MtmHeartbeatSendTimeout);
469464
if (rc == 1) {
470465
socklen_t optlen = sizeof(int);
471466
if (getsockopt(sd, SOL_SOCKET, SO_ERROR, (void*)&rc, &optlen) < 0) {
472467
MTM_ELOG(WARNING, "Arbiter failed to getsockopt for %s:%d: error=%d", host, port, errno);
473-
close(sd);
474-
busy_mask = save_mask;
475-
return -1;
468+
goto Error;
476469
}
477470
if (rc == 0) {
478471
break;
@@ -513,16 +506,28 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
513506
close(sd);
514507
goto Retry;
515508
}
516-
509+
if (addrs)
510+
pg_freeaddrinfo_all(hint.ai_family, addrs);
511+
517512
MtmLock(LW_EXCLUSIVE);
518513
MtmCheckResponse(&resp);
519514
MtmUnlock();
520515

521516
MtmOnNodeConnect(node+1);
522517

523518
busy_mask = save_mask;
524-
519+
525520
return sd;
521+
522+
Error:
523+
busy_mask = save_mask;
524+
if (sd >= 0) {
525+
close(sd);
526+
}
527+
if (addrs) {
528+
pg_freeaddrinfo_all(hint.ai_family, addrs);
529+
}
530+
return -1;
526531
}
527532

528533

multimaster.c

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ int MtmMin2PCTimeout;
233233
int MtmMax2PCRatio;
234234
bool MtmUseDtm;
235235
bool MtmPreserveCommitOrder;
236-
bool MtmVolksWagenMode;
236+
bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some NOTICE's and use local sequences */
237237

238238
TransactionId MtmUtilityProcessedInXid;
239239

@@ -263,6 +263,23 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
263263
ProcessUtilityContext context, ParamListInfo params,
264264
DestReceiver *dest, char *completionTag);
265265

266+
static bool MtmAtExitHookRegistered = false;
267+
268+
/*
269+
* Release multimaster main lock if been hold.
270+
* This function is called when backend is terminated because of critical error or when error is catched
271+
* by FINALLY block
272+
*/
273+
void MtmReleaseLock(void)
274+
{
275+
if (MtmLockCount != 0) {
276+
Assert(Mtm->lastLockHolder == MyProcPid);
277+
MtmLockCount = 0;
278+
Mtm->lastLockHolder = 0;
279+
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
280+
}
281+
}
282+
266283
/*
267284
* -------------------------------------------
268285
* Synchronize access to MTM structures.
@@ -276,8 +293,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
276293
void MtmLock(LWLockMode mode)
277294
{
278295
timestamp_t start, stop;
296+
if (!MtmAtExitHookRegistered) {
297+
atexit(MtmReleaseLock);
298+
MtmAtExitHookRegistered = true;
299+
}
279300
if (mode == LW_EXCLUSIVE || MtmLockCount != 0) {
280301
if (MtmLockCount++ != 0) {
302+
Assert(Mtm->lastLockHolder == MyProcPid);
281303
return;
282304
}
283305
}
@@ -293,6 +315,7 @@ void MtmLock(LWLockMode mode)
293315
void MtmUnlock(void)
294316
{
295317
if (MtmLockCount != 0 && --MtmLockCount != 0) {
318+
Assert(Mtm->lastLockHolder == MyProcPid);
296319
return;
297320
}
298321
LWLockRelease((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID]);
@@ -2421,6 +2444,7 @@ static void MtmInitialize()
24212444
Mtm = (MtmState*)ShmemInitStruct(MULTIMASTER_NAME, sizeof(MtmState) + sizeof(MtmNodeInfo)*(MtmMaxNodes-1), &found);
24222445
if (!found)
24232446
{
2447+
MemSet(Mtm, 0, sizeof(MtmState) + sizeof(MtmNodeInfo)*(MtmMaxNodes-1));
24242448
Mtm->status = MTM_INITIALIZATION;
24252449
Mtm->recoverySlot = 0;
24262450
Mtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);

multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,5 +419,6 @@ extern void MtmPrecommitTransaction(char const* gid);
419419
extern char* MtmGucSerialize(void);
420420
extern bool MtmTransIsActive(void);
421421
extern MtmTransState* MtmGetActiveTransaction(MtmL2List* list);
422+
extern void MtmReleaseLock(void);
422423

423424
#endif

pglogical_apply.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1041,6 +1041,7 @@ void MtmExecutor(void* work, size_t size)
10411041
int spill_file = -1;
10421042
int save_cursor = 0;
10431043
int save_len = 0;
1044+
MemoryContext topContext;
10441045

10451046
s.data = work;
10461047
s.len = size;
@@ -1054,7 +1055,7 @@ void MtmExecutor(void* work, size_t size)
10541055
ALLOCSET_DEFAULT_INITSIZE,
10551056
ALLOCSET_DEFAULT_MAXSIZE);
10561057
}
1057-
MemoryContextSwitchTo(MtmApplyContext);
1058+
topContext = MemoryContextSwitchTo(MtmApplyContext);
10581059

10591060
replorigin_session_origin = InvalidRepOriginId;
10601061
PG_TRY();
@@ -1144,7 +1145,9 @@ void MtmExecutor(void* work, size_t size)
11441145
}
11451146
PG_CATCH();
11461147
{
1147-
MemoryContext oldcontext = MemoryContextSwitchTo(MtmApplyContext);
1148+
MemoryContext oldcontext;
1149+
MtmReleaseLock();
1150+
oldcontext = MemoryContextSwitchTo(MtmApplyContext);
11481151
MtmHandleApplyError();
11491152
MemoryContextSwitchTo(oldcontext);
11501153
EmitErrorReport();
@@ -1162,5 +1165,6 @@ void MtmExecutor(void* work, size_t size)
11621165
}
11631166
#endif
11641167
MemoryContextResetAndDeleteChildren(MtmApplyContext);
1168+
MemoryContextSwitchTo(topContext);
11651169
}
11661170

0 commit comments

Comments
 (0)