Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 10d0eda

Browse files
committed
dmq integration: make setup procedures more compatible with previous mm versions
1 parent 29d51e1 commit 10d0eda

11 files changed

+121
-85
lines changed

Cluster.pm

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,12 @@ sub configure
120120
listen_addresses = '$host'
121121
unix_socket_directories = '$unix_sock_dir'
122122
port = $pgport
123-
max_prepared_transactions = 10
123+
max_prepared_transactions = 30
124124
max_connections = 10
125125
max_worker_processes = 100
126126
wal_level = logical
127127
max_wal_senders = 6
128128
wal_sender_timeout = 0
129-
default_transaction_isolation = 'repeatable read'
130129
max_replication_slots = 6
131130
shared_preload_libraries = 'multimaster'
132131
shared_buffers = 16MB
@@ -160,6 +159,7 @@ sub start
160159
foreach my $node (@$nodes)
161160
{
162161
$node->start();
162+
$node->safe_psql('postgres', "create extension multimaster;");
163163
note( "Starting node with connstr 'dbname=postgres port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
164164
}
165165
}
@@ -230,7 +230,7 @@ sub stop
230230
$node->stop($mode);
231231
}
232232

233-
$self->dumplogs();
233+
# $self->dumplogs();
234234

235235
return $ok;
236236
}

bgwpool.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
typedef void(*BgwPoolExecutor)(void* work, size_t size);
1010

11-
typedef ulong64 timestamp_t;
11+
typedef long timestamp_t;
1212

1313

1414
#define MAX_DBNAME_LEN 30

commit.c

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "utils/guc.h"
1616
#include "miscadmin.h"
1717
#include "commands/dbcommands.h"
18+
#include "tcop/tcopprot.h"
1819

1920
#include "logger.h"
2021

@@ -63,18 +64,19 @@ MtmBeginTransaction(MtmCurrentTrans* x)
6364
x->containsDML = false; // will be set by executor hook
6465
x->isTransactionBlock = IsTransactionBlock();
6566

66-
// /* Application name can be changed using PGAPPNAME environment variable */
67-
// if (x->isDistributed && Mtm->status != MTM_ONLINE
68-
// && strcmp(application_name, MULTIMASTER_ADMIN) != 0
69-
// && strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) != 0)
70-
// {
71-
// /* Reject all user's transactions at offline cluster.
72-
// * Allow execution of transaction by bg-workers to makeit possible to perform recovery.
73-
// */
74-
// MTM_ELOG(ERROR,
75-
// "Multimaster node is not online: current status %s",
76-
// MtmNodeStatusMnem[Mtm->status]);
77-
// }
67+
/* Application name can be changed using PGAPPNAME environment variable */
68+
if (x->isDistributed && Mtm->status != MTM_ONLINE
69+
&& strcmp(application_name, MULTIMASTER_ADMIN) != 0
70+
&& strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) != 0
71+
&& debug_query_string && pg_strcasecmp(debug_query_string, "create extension multimaster;") != 0)
72+
{
73+
/* Reject all user's transactions at offline cluster.
74+
* Allow execution of transaction by bg-workers to makeit possible to perform recovery.
75+
*/
76+
MTM_ELOG(ERROR,
77+
"Multimaster node is not online: current status %s",
78+
MtmNodeStatusMnem[Mtm->status]);
79+
}
7880
}
7981

8082
static void
@@ -104,7 +106,7 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
104106
char stream[DMQ_NAME_MAXLEN];
105107
pgid_t gid;
106108

107-
if (!x->isDistributed || !x->containsDML)
109+
if (!x->isDistributed || !x->containsDML || !Mtm->extension_created)
108110
return false;
109111

110112
if (!DmqSubscribed)

multimaster.c

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -435,20 +435,26 @@ timestamp_t MtmGetCurrentTime(void)
435435
return MtmGetSystemTime() + Mtm->timeShift;
436436
}
437437

438-
void MtmSleep(timestamp_t interval)
438+
void MtmSleep(timestamp_t usec)
439439
{
440-
timestamp_t waketm = MtmGetCurrentTime() + interval;
440+
timestamp_t waketm = MtmGetCurrentTime() + usec;
441+
441442
for (;;)
442443
{
444+
int rc;
443445
timestamp_t sleepfor = waketm - MtmGetCurrentTime();
444446

445-
pg_usleep(sleepfor);
446-
if (MtmGetCurrentTime() < waketm)
447-
{
448-
/* Assert(errno == EINTR); */
449-
continue;
450-
}
451-
break;
447+
CHECK_FOR_INTERRUPTS();
448+
449+
rc = WaitLatch(MyLatch,
450+
WL_TIMEOUT | WL_POSTMASTER_DEATH,
451+
sleepfor/1000.0, WAIT_EVENT_BGWORKER_STARTUP);
452+
453+
if (rc & WL_POSTMASTER_DEATH)
454+
proc_exit(1);
455+
456+
if (MtmGetCurrentTime() > waketm)
457+
break;
452458
}
453459
}
454460

@@ -1177,10 +1183,10 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
11771183
MtmLock(LW_EXCLUSIVE);
11781184
if (MtmMin2PCTimeout != 0 && now > deadline) {
11791185
if (ts->isPrepared) {
1180-
MTM_ELOG(LOG, "Distributed transaction %s (%llu) is not committed in %lld msec", ts->gid, (long64)ts->xid, USEC_TO_MSEC(now - start));
1186+
// MTM_ELOG(LOG, "Distributed transaction %s (%llu) is not committed in %lld msec", ts->gid, (long64)ts->xid, USEC_TO_MSEC(now - start));
11811187
} else {
1182-
MTM_ELOG(WARNING, "Commit of distributed transaction %s (%llu) is canceled because of %lld msec timeout expiration",
1183-
ts->gid, (long64)ts->xid, USEC_TO_MSEC(timeout));
1188+
// MTM_ELOG(WARNING, "Commit of distributed transaction %s (%llu) is canceled because of %lld msec timeout expiration",
1189+
// ts->gid, (long64)ts->xid, USEC_TO_MSEC(timeout));
11841190
MtmAbortTransaction(ts);
11851191
break;
11861192
}
@@ -2398,6 +2404,7 @@ static void MtmInitialize()
23982404
if (!found)
23992405
{
24002406
MemSet(Mtm, 0, sizeof(MtmState) + sizeof(MtmNodeInfo)*(MtmMaxNodes-1));
2407+
Mtm->extension_created = false;
24012408
Mtm->status = MTM_DISABLED; //MTM_INITIALIZATION;
24022409
Mtm->recoverySlot = 0;
24032410
Mtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);
@@ -2481,19 +2488,6 @@ static void MtmInitialize()
24812488

24822489
MtmCheckControlFile();
24832490

2484-
/* Start dmq senders */
2485-
for (i = 0; i < MtmNodes; i++)
2486-
{
2487-
int destination_id;
2488-
2489-
if (i + 1 == MtmNodeId)
2490-
continue;
2491-
destination_id = dmq_destination_add(MtmConnections[i].connStr,
2492-
psprintf("node%d", MtmNodeId),
2493-
psprintf("node%d", i + 1),
2494-
MtmHeartbeatSendTimeout);
2495-
Mtm->nodes[i].destination_id = destination_id;
2496-
}
24972491
}
24982492

24992493
static void
@@ -3394,7 +3388,7 @@ void MtmFinishPreparedTransaction(MtmTransState* ts, bool commit)
33943388
* During recovery we need to open only one replication slot from which node should receive all transactions.
33953389
* Slots at other nodes should be removed.
33963390
*/
3397-
MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown)
3391+
MtmReplicationMode MtmGetReplicationMode(int nodeId)
33983392
{
33993393
MtmLock(LW_EXCLUSIVE);
34003394

@@ -3410,8 +3404,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
34103404
BIT_CHECK(EFFECTIVE_CONNECTIVITY_MASK, MtmNodeId - 1))
34113405
{
34123406
MtmUnlock();
3413-
if (*shutdown)
3414-
return REPLMODE_EXIT;
34153407
MtmSleep(STATUS_POLL_DELAY);
34163408
MtmLock(LW_EXCLUSIVE);
34173409
}
@@ -3435,8 +3427,6 @@ MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shut
34353427
while (BIT_CHECK(Mtm->disabledNodeMask, MtmNodeId - 1))
34363428
{
34373429
MtmUnlock();
3438-
if (*shutdown)
3439-
return REPLMODE_EXIT;
34403430
MtmSleep(STATUS_POLL_DELAY);
34413431
MtmLock(LW_EXCLUSIVE);
34423432
}
@@ -4267,7 +4257,8 @@ typedef struct
42674257
int nodeId;
42684258
} MtmGetClusterInfoCtx;
42694259

4270-
static void erase_option_from_connstr(const char *option, char *connstr)
4260+
void
4261+
erase_option_from_connstr(const char *option, char *connstr)
42714262
{
42724263
char *needle = psprintf("%s=", option);
42734264
while (1) {
@@ -5772,3 +5763,22 @@ MtmToggleDML(void)
57725763
{
57735764
MtmTx.containsDML = true;
57745765
}
5766+
5767+
5768+
void
5769+
MtmWaitForExtensionCreation(void)
5770+
{
5771+
for (;;)
5772+
{
5773+
RangeVar *rv;
5774+
Oid rel_oid;
5775+
int rc;
5776+
5777+
StartTransactionCommand();
5778+
rv = makeRangeVar(MULTIMASTER_SCHEMA_NAME, "local_tables", -1);
5779+
rel_oid = RangeVarGetRelid(rv, NoLock, true);
5780+
CommitTransactionCommand();
5781+
5782+
MtmSleep(USECS_PER_SEC);
5783+
}
5784+
}

multimaster.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ typedef struct {
316316

317317
typedef struct
318318
{
319+
bool extension_created;
319320
MtmNodeStatus status; /* Status of this node */
320321
/* A human-readable description of why the current status was set */
321322
char *statusReason;
@@ -451,6 +452,9 @@ extern void MtmGenerateGid(char *gid, TransactionId xid);
451452
extern int MtmGidParseNodeId(const char* gid);
452453
extern TransactionId MtmGidParseXid(const char* gid);
453454

455+
extern void MtmWaitForExtensionCreation(void);
456+
extern void erase_option_from_connstr(const char *option, char *connstr);
457+
454458
extern void ResolverMain(void);
455459
extern void ResolverInit(void);
456460
extern void ResolveTransactionsForNode(int node_id);
@@ -468,7 +472,7 @@ extern csn_t MtmDistributedTransactionSnapshot(TransactionId xid, int nodeId, no
468472
extern csn_t MtmAssignCSN(void);
469473
extern csn_t MtmSyncClock(csn_t csn);
470474
extern void MtmJoinTransaction(GlobalTransactionId* gtid, csn_t snapshot, nodemask_t participantsMask);
471-
extern MtmReplicationMode MtmGetReplicationMode(int nodeId, sig_atomic_t volatile* shutdown);
475+
extern MtmReplicationMode MtmGetReplicationMode(int nodeId);
472476
extern void MtmExecutor(void* work, size_t size);
473477
extern void MtmSend2PCMessage(MtmTransState* ts, MtmMessageCode cmd);
474478
extern void MtmSendMessage(MtmArbiterMessage* msg);

pglogical_receiver.c

Lines changed: 13 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "replication/origin.h"
4141
#include "utils/portal.h"
4242
#include "tcop/pquery.h"
43+
#include "tcop/tcopprot.h"
4344

4445
#ifdef WITH_RSOCKET
4546
#include "libpq-int.h"
@@ -55,7 +56,6 @@
5556
#define RECEIVER_SUSPEND_TIMEOUT (1*USECS_PER_SEC)
5657

5758
/* Signal handling */
58-
static volatile sig_atomic_t got_sigterm = false;
5959
static volatile sig_atomic_t got_sighup = false;
6060

6161
/* GUC variables */
@@ -75,16 +75,6 @@ static int64 fe_recvint64(char *buf);
7575

7676
void pglogical_receiver_main(Datum main_arg);
7777

78-
static void
79-
receiver_raw_sigterm(SIGNAL_ARGS)
80-
{
81-
int save_errno = errno;
82-
got_sigterm = true;
83-
if (MyProc)
84-
SetLatch(&MyProc->procLatch);
85-
errno = save_errno;
86-
}
87-
8878
static void
8979
receiver_raw_sighup(SIGNAL_ARGS)
9080
{
@@ -258,7 +248,8 @@ pglogical_receiver_main(Datum main_arg)
258248

259249
/* Register functions for SIGTERM/SIGHUP management */
260250
pqsignal(SIGHUP, receiver_raw_sighup);
261-
pqsignal(SIGTERM, receiver_raw_sigterm);
251+
// pqsignal(SIGTERM, receiver_raw_sigterm);
252+
pqsignal(SIGTERM, die);
262253

263254
MtmCreateSpillDirectory(nodeId);
264255

@@ -277,6 +268,8 @@ pglogical_receiver_main(Datum main_arg)
277268
ActivePortal->status = PORTAL_ACTIVE;
278269
ActivePortal->sourceText = "";
279270

271+
MtmWaitForExtensionCreation();
272+
280273
BgwPoolStart(&Mtm->nodes[nodeId-1].pool, worker_proc);
281274

282275
/*
@@ -305,7 +298,7 @@ pglogical_receiver_main(Datum main_arg)
305298
* In case of errors we will try to reestablish connection.
306299
* Also reconnet is forced when node is switch to recovery mode
307300
*/
308-
while (!got_sigterm)
301+
for(;;)
309302
{
310303
int count;
311304
ConnStatusType status;
@@ -317,7 +310,7 @@ pglogical_receiver_main(Datum main_arg)
317310
* Druing recovery we need to open only one replication slot from which node should receive all transactions.
318311
* Slots at other nodes should be removed
319312
*/
320-
mode = MtmGetReplicationMode(nodeId, &got_sigterm);
313+
mode = MtmGetReplicationMode(nodeId);
321314
MTM_LOG1("[STATE] Node %i: wal_receiver starts in %s mode", nodeId, MtmReplicationModeName[mode]);
322315

323316
// if (mode == REPLMODE_RECOVERY)
@@ -419,7 +412,7 @@ pglogical_receiver_main(Datum main_arg)
419412

420413
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_WAL_RECEIVER_START);
421414

422-
while (!got_sigterm)
415+
for(;;)
423416
{
424417
int rc, hdr_len;
425418
/* Wait necessary amount of time */
@@ -437,13 +430,6 @@ pglogical_receiver_main(Datum main_arg)
437430
ereport(LOG, (MTM_ERRMSG("%s: processed SIGHUP", worker_proc)));
438431
}
439432

440-
if (got_sigterm)
441-
{
442-
/* Simply exit */
443-
ereport(LOG, (MTM_ERRMSG("%s: processed SIGTERM", worker_proc)));
444-
proc_exit(0);
445-
}
446-
447433
/* Emergency bailout if postmaster has died */
448434
if (rc & WL_POSTMASTER_DEATH)
449435
{
@@ -606,12 +592,12 @@ pglogical_receiver_main(Datum main_arg)
606592
} else {
607593
if (MtmPreserveCommitOrder && buf.used == msg_len) {
608594
/* Perform commit-prepared and rollback-prepared requested directly in receiver */
609-
timestamp_t stop, start = MtmGetSystemTime();
595+
// timestamp_t stop, start = MtmGetSystemTime();
610596
MtmExecutor(buf.data, buf.used);
611-
stop = MtmGetSystemTime();
612-
if (stop - start > USECS_PER_SEC) {
613-
elog(WARNING, "Commit of prepared transaction takes %lld usec, flags=%x", stop - start, stmt[1]);
614-
}
597+
// stop = MtmGetSystemTime();
598+
// if (stop - start > USECS_PER_SEC) {
599+
// elog(WARNING, "Commit of prepared transaction takes %lld usec, flags=%x", stop - start, stmt[1]);
600+
// }
615601
} else {
616602
/* all other commits should be applied in place */
617603
// Assert(stmt[1] == PGLOGICAL_PREPARE || stmt[1] == PGLOGICAL_COMMIT || stmt[1] == PGLOGICAL_PRECOMMIT_PREPARED);

resolver.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -428,8 +428,13 @@ ResolverMain(void)
428428
/* init this worker */
429429
pqsignal(SIGTERM, die);
430430
BackgroundWorkerUnblockSignals();
431+
432+
MtmBackgroundWorker = true;
433+
431434
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL, 0);
432435

436+
MtmWaitForExtensionCreation();
437+
433438
/* init map with current unresolved transactions */
434439
ctl.keysize = GIDSIZE;
435440
ctl.entrysize = sizeof(resolver_tx);

0 commit comments

Comments
 (0)