Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4710350

Browse files
knizhnikkelvich
authored andcommitted
Code cleanup
1 parent de24524 commit 4710350

File tree

5 files changed

+55
-94
lines changed

5 files changed

+55
-94
lines changed

arbiter.c

+26-67
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
#include "replication/slot.h"
5757
#include "port/atomics.h"
5858
#include "tcop/utility.h"
59+
#include "libpq/ip.h"
5960

6061
#ifndef USE_EPOLL
6162
#ifdef __linux__
@@ -139,31 +140,6 @@ void MtmArbiterInitialize(void)
139140
RegisterBackgroundWorker(&MtmMonitorWorker);
140141
}
141142

142-
static int
143-
MtmResolveHostByName(const char *hostname, unsigned* addrs, unsigned* n_addrs)
144-
{
145-
struct sockaddr_in sin;
146-
struct hostent* hp;
147-
unsigned i;
148-
149-
sin.sin_addr.s_addr = inet_addr(hostname);
150-
if (sin.sin_addr.s_addr != INADDR_NONE) {
151-
memcpy(&addrs[0], &sin.sin_addr.s_addr, sizeof(sin.sin_addr.s_addr));
152-
*n_addrs = 1;
153-
return 1;
154-
}
155-
156-
hp = gethostbyname(hostname);
157-
if (hp == NULL || hp->h_addrtype != AF_INET) {
158-
return 0;
159-
}
160-
for (i = 0; hp->h_addr_list[i] != NULL && i < *n_addrs; i++) {
161-
memcpy(&addrs[i], hp->h_addr_list[i], sizeof(addrs[i]));
162-
}
163-
*n_addrs = i;
164-
return 1;
165-
}
166-
167143
static int stop = 0;
168144
static void SetStop(int sig)
169145
{
@@ -352,7 +328,6 @@ static void MtmCheckResponse(MtmArbiterMessage* resp)
352328

353329
static void MtmScheduleHeartbeat()
354330
{
355-
// Assert(!last_sent_heartbeat || last_sent_heartbeat + MSEC_TO_USEC(MtmHeartbeatRecvTimeout) >= MtmGetSystemTime());
356331
if (!stop) {
357332
enable_timeout_after(heartbeat_timer, MtmHeartbeatSendTimeout);
358333
send_heartbeat = true;
@@ -399,7 +374,6 @@ static void MtmSendHeartbeat()
399374
close(sockets[i]);
400375
sockets[i] = -1;
401376
MtmReconnectNode(i+1); /* set reconnect mask to force node reconnent */
402-
//MtmOnNodeConnect(i+1);
403377
}
404378
MTM_LOG4("Send heartbeat to node %d with timestamp %lld", i+1, now);
405379
}
@@ -426,23 +400,31 @@ void MtmCheckHeartbeat()
426400

427401
static int MtmConnectSocket(int node, int port, time_t timeout)
428402
{
429-
struct sockaddr_in sock_inet;
430-
unsigned addrs[MAX_ROUTES];
431-
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
403+
struct addrinfo *addrs = NULL;
404+
struct addrinfo *addr;
405+
struct addrinfo hint;
406+
char portstr[MAXPGPATH];
432407
MtmHandshakeMessage req;
433408
MtmArbiterMessage resp;
434409
int sd;
410+
int ret;
435411
timestamp_t start = MtmGetSystemTime();
436412
char const* host = Mtm->nodes[node].con.hostName;
437413
nodemask_t save_mask = busy_mask;
438414
timestamp_t afterWait;
439415
timestamp_t beforeWait;
440416

441-
sock_inet.sin_family = AF_INET;
442-
sock_inet.sin_port = htons(port);
417+
/* Initialize hint structure */
418+
MemSet(&hint, 0, sizeof(hint));
419+
hint.ai_socktype = SOCK_STREAM;
420+
hint.ai_family = AF_UNSPEC;
421+
422+
snprintf(portstr, sizeof(portstr), "%d", port);
443423

444-
if (!MtmResolveHostByName(host, addrs, &n_addrs)) {
445-
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name", host);
424+
ret = pg_getaddrinfo_all(host, portstr, &hint, &addrs);
425+
if (ret != 0)
426+
{
427+
MTM_ELOG(LOG, "Arbiter failed to resolve host '%s' by name: %s", host, gai_strerror(ret));
446428
return -1;
447429
}
448430
BIT_SET(busy_mask, node);
@@ -459,13 +441,14 @@ static int MtmConnectSocket(int node, int port, time_t timeout)
459441
rc = fcntl(sd, F_SETFL, O_NONBLOCK);
460442
if (rc < 0) {
461443
MTM_ELOG(LOG, "Arbiter failed to switch socket to non-blocking mode: %d", errno);
444+
close(sd);
462445
busy_mask = save_mask;
463446
return -1;
464447
}
465-
for (i = 0; i < n_addrs; ++i) {
466-
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
448+
for (addr = addrs; addr != NULL; addr = addr->ai_next)
449+
{
467450
do {
468-
rc = connect(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
451+
rc = connect(sd, addr->ai_addr, addr->ai_addrlen);
469452
} while (rc < 0 && errno == EINTR);
470453

471454
if (rc >= 0 || errno == EINPROGRESS) {
@@ -638,6 +621,7 @@ static void MtmAcceptOneConnection()
638621
rc = MtmReadSocket(fd, &req, sizeof req);
639622
if (rc < sizeof(req)) {
640623
MTM_ELOG(WARNING, "Arbiter failed to handshake socket: %d, errno=%d", rc, errno);
624+
close(fd);
641625
} else if (req.hdr.code != MSG_HANDSHAKE && req.hdr.dxid != HANDSHAKE_MAGIC) {
642626
MTM_ELOG(WARNING, "Arbiter get unexpected handshake message %d", req.hdr.code);
643627
close(fd);
@@ -693,7 +677,9 @@ static void MtmAcceptIncomingConnections()
693677
if (gateway < 0) {
694678
MTM_ELOG(ERROR, "Arbiter failed to create socket: %d", errno);
695679
}
696-
setsockopt(gateway, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on);
680+
if (setsockopt(gateway, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof on) < 0) {
681+
MTM_ELOG(ERROR, "Arbiter failed to set options for socket: %d", errno);
682+
}
697683

698684
if (bind(gateway, (struct sockaddr*)&sock_inet, sizeof(sock_inet)) < 0) {
699685
MTM_ELOG(ERROR, "Arbiter failed to bind socket: %d", errno);
@@ -726,7 +712,6 @@ static void MtmAppendBuffer(MtmBuffer* txBuffer, MtmArbiterMessage* msg)
726712

727713
static void MtmSender(Datum arg)
728714
{
729-
sigset_t sset;
730715
int nNodes = MtmMaxNodes;
731716
int i;
732717

@@ -737,8 +722,6 @@ static void MtmSender(Datum arg)
737722
signal(SIGINT, SetStop);
738723
signal(SIGQUIT, SetStop);
739724
signal(SIGTERM, SetStop);
740-
sigfillset(&sset);
741-
sigprocmask(SIG_UNBLOCK, &sset, NULL);
742725

743726
/* We're now ready to receive signals */
744727
BackgroundWorkerUnblockSignals();
@@ -815,13 +798,9 @@ static bool MtmRecovery()
815798

816799
static void MtmMonitor(Datum arg)
817800
{
818-
sigset_t sset;
819-
820801
signal(SIGINT, SetStop);
821802
signal(SIGQUIT, SetStop);
822803
signal(SIGTERM, SetStop);
823-
sigfillset(&sset);
824-
sigprocmask(SIG_UNBLOCK, &sset, NULL);
825804

826805
/* We're now ready to receive signals */
827806
BackgroundWorkerUnblockSignals();
@@ -840,7 +819,6 @@ static void MtmMonitor(Datum arg)
840819

841820
static void MtmReceiver(Datum arg)
842821
{
843-
sigset_t sset;
844822
int nNodes = MtmMaxNodes;
845823
int nResponses;
846824
int i, j, n, rc;
@@ -860,8 +838,6 @@ static void MtmReceiver(Datum arg)
860838
signal(SIGINT, SetStop);
861839
signal(SIGQUIT, SetStop);
862840
signal(SIGTERM, SetStop);
863-
sigfillset(&sset);
864-
sigprocmask(SIG_UNBLOCK, &sset, NULL);
865841

866842
/* We're now ready to receive signals */
867843
BackgroundWorkerUnblockSignals();
@@ -1078,7 +1054,6 @@ static void MtmReceiver(Datum arg)
10781054
} else if (MtmUseDtm) {
10791055
ts->votedMask = 0;
10801056
MTM_TXTRACE(ts, "MtmTransReceiver send MSG_PRECOMMIT");
1081-
//MtmSend2PCMessage(ts, MSG_PRECOMMIT);
10821057
Assert(replorigin_session_origin == InvalidRepOriginId);
10831058
MTM_LOG2("SetPreparedTransactionState for %s", ts->gid);
10841059
MtmUnlock();
@@ -1130,7 +1105,7 @@ static void MtmReceiver(Datum arg)
11301105
} else {
11311106
Assert(ts->status == TRANSACTION_STATUS_ABORTED);
11321107
MTM_ELOG(WARNING, "Receive PRECOMMITTED response for aborted transaction %s (%llu) from node %d",
1133-
ts->gid, (long64)ts->xid, node); // How it can happen? Should we use assert here?
1108+
ts->gid, (long64)ts->xid, node);
11341109
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
11351110
MtmWakeUpBackend(ts);
11361111
}
@@ -1140,23 +1115,7 @@ static void MtmReceiver(Datum arg)
11401115
Assert(false);
11411116
}
11421117
} else {
1143-
switch (msg->code) {
1144-
case MSG_PRECOMMIT:
1145-
Assert(false); // Now sent through pglogical
1146-
if (ts->status == TRANSACTION_STATUS_IN_PROGRESS) {
1147-
ts->status = TRANSACTION_STATUS_UNKNOWN;
1148-
ts->csn = MtmAssignCSN();
1149-
MtmAdjustSubtransactions(ts);
1150-
MtmSend2PCMessage(ts, MSG_PRECOMMITTED);
1151-
} else if (ts->status == TRANSACTION_STATUS_ABORTED) {
1152-
MtmSend2PCMessage(ts, MSG_ABORTED);
1153-
} else {
1154-
MTM_ELOG(WARNING, "Transaction %s is already %s", ts->gid, MtmTxnStatusMnem[ts->status]);
1155-
}
1156-
break;
1157-
default:
1158-
Assert(false);
1159-
}
1118+
Assert(false); /* All broadcasts are now sent through pglogical */
11601119
}
11611120
}
11621121
MtmUnlock();

bgwpool.c

+5-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ static void BgwPoolMainLoop(BgwPool* pool)
3131
int size;
3232
void* work;
3333
static PortalData fakePortal;
34-
sigset_t sset;
3534

3635
MtmIsLogicalReceiver = true;
3736
MtmPool = pool;
@@ -40,9 +39,6 @@ static void BgwPoolMainLoop(BgwPool* pool)
4039
signal(SIGQUIT, BgwShutdownWorker);
4140
signal(SIGTERM, BgwShutdownWorker);
4241

43-
sigfillset(&sset);
44-
sigprocmask(SIG_UNBLOCK, &sset, NULL);
45-
4642
BackgroundWorkerUnblockSignals();
4743
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
4844
ActivePortal = &fakePortal;
@@ -57,7 +53,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
5753
}
5854
size = *(int*)&pool->queue[pool->head];
5955
Assert(size < pool->size);
60-
work = malloc(size);
56+
work = palloc(size);
6157
pool->pending -= 1;
6258
pool->active += 1;
6359
if (pool->lastPeakTime == 0 && pool->active == pool->nWorkers && pool->pending != 0) {
@@ -80,7 +76,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
8076
}
8177
SpinLockRelease(&pool->lock);
8278
pool->executor(work, size);
83-
free(work);
79+
pfree(work);
8480
SpinLockAcquire(&pool->lock);
8581
pool->active -= 1;
8682
pool->lastPeakTime = 0;
@@ -93,6 +89,9 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
9389
{
9490
MtmPool = pool;
9591
pool->queue = (char*)ShmemAlloc(queueSize);
92+
if (pool->queue == NULL) {
93+
elog(PANIC, "Failed to allocate memory for background workers pool: %lld bytes requested", (long64)queueSize);
94+
}
9695
pool->executor = executor;
9796
PGSemaphoreCreate(&pool->available);
9897
PGSemaphoreCreate(&pool->overflow);

multimaster.c

+9-15
Original file line numberDiff line numberDiff line change
@@ -1255,7 +1255,6 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
12551255
if (x->status != TRANSACTION_STATUS_ABORTED) {
12561256
MtmLock(LW_EXCLUSIVE);
12571257
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_FIND, NULL);
1258-
//tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_REMOVE, NULL);
12591258
if (tm == NULL) {
12601259
MTM_ELOG(WARNING, "Global transaciton ID '%s' is not found", x->gid);
12611260
} else {
@@ -1394,6 +1393,9 @@ void MtmSendMessage(MtmArbiterMessage* msg)
13941393
MtmMessageQueue* sendQueue = Mtm->sendQueue;
13951394
if (mq == NULL) {
13961395
mq = (MtmMessageQueue*)ShmemAlloc(sizeof(MtmMessageQueue));
1396+
if (mq == NULL) {
1397+
elog(PANIC, "Failed to allocate shared memory for message queue");
1398+
}
13971399
} else {
13981400
Mtm->freeQueue = mq->next;
13991401
}
@@ -1425,20 +1427,8 @@ void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd)
14251427
msg.lockReq = Mtm->nodeLockerMask != 0;
14261428
memcpy(msg.gid, ts->gid, MULTIMASTER_MAX_GID_SIZE);
14271429

1428-
if (MtmIsCoordinator(ts)) {
1429-
int i;
1430-
Assert(false); // All broadcasts are now done through logical decoding
1431-
for (i = 0; i < Mtm->nAllNodes; i++)
1432-
{
1433-
if (BIT_CHECK(ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask, i))
1434-
{
1435-
Assert(TransactionIdIsValid(ts->xids[i]));
1436-
msg.node = i+1;
1437-
msg.dxid = ts->xids[i];
1438-
MtmSendMessage(&msg);
1439-
}
1440-
}
1441-
} else if (!BIT_CHECK(Mtm->disabledNodeMask, ts->gtid.node-1)) {
1430+
Assert(!MtmIsCoordinator(ts)); /* All broadcasts are now done through logical decoding */
1431+
if (!BIT_CHECK(Mtm->disabledNodeMask, ts->gtid.node-1)) {
14421432
MTM_LOG2("Send %s message to node %d xid=%d gid=%s", MtmMessageKindMnem[cmd], ts->gtid.node, ts->gtid.xid, ts->gid);
14431433
msg.node = ts->gtid.node;
14441434
msg.dxid = ts->gtid.xid;
@@ -4674,6 +4664,10 @@ void MtmUpdateLockGraph(int nodeId, void const* messageBody, int messageSize)
46744664
if (messageSize > allocated) {
46754665
allocated = Max(Max(MULTIMASTER_LOCK_BUF_INIT_SIZE, allocated*2), messageSize);
46764666
Mtm->nodes[nodeId-1].lockGraphData = ShmemAlloc(allocated);
4667+
if (Mtm->nodes[nodeId-1].lockGraphData == NULL) {
4668+
elog(PANIC, "Failed to allocate shared memory for lock graph: %d bytes requested",
4669+
allocated);
4670+
}
46774671
Mtm->nodes[nodeId-1].lockGraphAllocated = allocated;
46784672
}
46794673
memcpy(Mtm->nodes[nodeId-1].lockGraphData, messageBody, messageSize);

pglogical_apply.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,7 @@ process_remote_commit(StringInfo in)
650650
case PGLOGICAL_PRECOMMIT_PREPARED:
651651
{
652652
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
653-
strcpy(gid, pq_getmsgstring(in));
653+
strncpy(gid, pq_getmsgstring(in), sizeof gid);
654654
MTM_LOG2("%d: PGLOGICAL_PRECOMMIT_PREPARED %s", MyProcPid, gid);
655655
MtmBeginSession(origin_node);
656656
MtmPrecommitTransaction(gid);
@@ -671,7 +671,7 @@ process_remote_commit(StringInfo in)
671671
case PGLOGICAL_PREPARE:
672672
{
673673
Assert(IsTransactionState() && TransactionIdIsValid(MtmGetCurrentTransactionId()));
674-
strcpy(gid, pq_getmsgstring(in));
674+
strncpy(gid, pq_getmsgstring(in), sizeof gid);
675675
if (MtmExchangeGlobalTransactionStatus(gid, TRANSACTION_STATUS_IN_PROGRESS) == TRANSACTION_STATUS_ABORTED) {
676676
MTM_LOG1("Avoid prepare of previously aborted global transaction %s", gid);
677677
AbortCurrentTransaction();
@@ -704,7 +704,7 @@ process_remote_commit(StringInfo in)
704704
{
705705
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
706706
csn = pq_getmsgint64(in);
707-
strcpy(gid, pq_getmsgstring(in));
707+
strncpy(gid, pq_getmsgstring(in), sizeof gid);
708708
MTM_LOG2("PGLOGICAL_COMMIT_PREPARED commit: csn=%lld, gid=%s, lsn=%llx", csn, gid, end_lsn);
709709
MtmResetTransaction();
710710
StartTransactionCommand();
@@ -721,7 +721,7 @@ process_remote_commit(StringInfo in)
721721
case PGLOGICAL_ABORT_PREPARED:
722722
{
723723
Assert(!TransactionIdIsValid(MtmGetCurrentTransactionId()));
724-
strcpy(gid, pq_getmsgstring(in));
724+
strncpy(gid, pq_getmsgstring(in), sizeof gid);
725725
/* MtmRollbackPreparedTransaction will set origin session itself */
726726
MTM_LOG1("Receive ABORT_PREPARED logical message for transaction %s from node %d", gid, origin_node);
727727
MtmRollbackPreparedTransaction(origin_node, gid);

spill.c

+11-2
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,12 @@ void MtmCreateSpillDirectory(int node_id)
3535
mkdir(path, S_IRWXU);
3636

3737
spill_dir = AllocateDir(path);
38-
38+
if (spill_dir == NULL) {
39+
ereport(PANIC,
40+
(errcode_for_file_access(),
41+
MTM_ERRMSG("pglogical_receiver failed to create spill directory \"%s\": %m",
42+
path)));
43+
}
3944
while ((spill_de = ReadDir(spill_dir, path)) != NULL)
4045
{
4146
if (strncmp(spill_de->d_name, "txn", 3) == 0)
@@ -90,7 +95,11 @@ int MtmOpenSpillFile(int node_id, int file_id)
9095
MTM_ERRMSG("pglogical_apply could not open spill file \"%s\": %m",
9196
path)));
9297
}
93-
unlink(path); /* Should remove file on close */
98+
if (unlink(path) < 0) { /* Should remove file on close */
99+
ereport(LOG,
100+
(errcode_for_file_access(),
101+
MTM_ERRMSG("pglogical_apply failed to unlink spill file: %m")));
102+
}
94103
return fd;
95104
}
96105

0 commit comments

Comments
 (0)