Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 08d8da6

Browse files
committed
multidb refactor: rework output plugin hooks
1 parent c367ad6 commit 08d8da6

File tree

7 files changed

+209
-322
lines changed

7 files changed

+209
-322
lines changed

Cluster.pm

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,11 @@ sub configure
116116
my $unix_sock_dir = $ENV{PGHOST};
117117

118118
$node->append_conf("postgresql.conf", qq(
119-
log_statement = all
119+
# log_statement = all
120120
listen_addresses = '$host'
121121
unix_socket_directories = '$unix_sock_dir'
122122
port = $pgport
123-
max_prepared_transactions = 30
123+
max_prepared_transactions = 150
124124
max_connections = 10
125125
max_worker_processes = 100
126126
wal_level = logical

src/commit.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,12 +168,12 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
168168
}
169169

170170
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
171-
mtm_log(MtmTxFinish, "TXFINISH: %s precommittted", gid);
171+
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", gid);
172172
GatherPrecommits(x, participantsMask);
173173

174174
StartTransactionCommand();
175175
FinishPreparedTransaction(gid, true, false);
176-
mtm_log(MtmTxFinish, "TXFINISH: %s committted", gid);
176+
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
177177

178178
dmq_stream_unsubscribe(stream);
179179
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);

src/include/multimaster.h

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ typedef struct MtmFlushPosition
235235
lsn_t remote_end;
236236
} MtmFlushPosition;
237237

238+
typedef struct
239+
{
240+
int magic;
241+
bool is_recovery;
242+
} MtmDecoderPrivate;
238243

239244
extern char const* const MtmNodeStatusMnem[];
240245
extern char const* const MtmTxnStatusMnem[];
@@ -254,7 +259,6 @@ extern MtmConnectionInfo* MtmConnections;
254259
extern bool MtmMajorNode;
255260
extern bool MtmBackgroundWorker;
256261
extern char* MtmRefereeConnStr;
257-
extern bool MtmIsRecoverySession;
258262

259263
extern void MtmXactCallback2(XactEvent event, void *arg);
260264
extern void MtmMonitorInitialize(void);
@@ -293,24 +297,18 @@ extern void MtmResumeNode(int nodeId);
293297
extern void MtmSleep(timestamp_t interval);
294298

295299
extern void MtmSetCurrentTransactionGID(char const* gid, int node_id);
296-
297300
extern void MtmSetCurrentTransactionCSN(void);
298-
299301
extern TransactionId MtmGetCurrentTransactionId(void);
300-
extern XidStatus MtmGetCurrentTransactionStatus(void);
302+
extern void MtmResetTransaction(void);
301303

302-
extern bool MtmIsRecoveredNode(int nodeId);
303304
extern void MtmUpdateNodeConnectionInfo(MtmConnectionInfo* conn, char const* connStr);
304-
extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
305-
extern bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr);
306-
extern void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN);
307305

308306
extern void MtmHandleApplyError(void);
309307

310308
extern void MtmUpdateLsnMapping(int nodeId, lsn_t endLsn);
311309
extern lsn_t MtmGetFlushPosition(int nodeId);
312310

313-
extern void MtmResetTransaction(void);
311+
314312
extern void MtmReleaseRecoverySlot(int nodeId);
315313
extern PGconn *PQconnectdb_safe(const char *conninfo, int timeout);
316314
extern void MtmBeginSession(int nodeId);

src/multimaster.c

Lines changed: 0 additions & 274 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ MtmState* Mtm;
135135
MemoryContext MtmApplyContext;
136136
MtmConnectionInfo* MtmConnections;
137137

138-
bool MtmIsRecoverySession;
139138

140139
static dlist_head MtmLsnMapping = DLIST_STATIC_INIT(MtmLsnMapping);
141140

@@ -529,79 +528,6 @@ static int64 MtmGetSlotLag(int nodeId)
529528
}
530529

531530

532-
/*
533-
* This function is called by WAL sender when start sending new transaction.
534-
* It returns true if specified node is in recovery mode. In this case we should send to it all transactions from WAL,
535-
* not only coordinated by self node as in normal mode.
536-
*/
537-
bool MtmIsRecoveredNode(int nodeId)
538-
{
539-
if (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
540-
if (!MtmIsRecoverySession) {
541-
MTM_ELOG(ERROR, "Node %d is marked as disabled but is not in recovery mode", nodeId);
542-
}
543-
return true;
544-
} else {
545-
return false;
546-
}
547-
}
548-
549-
/*
550-
* Check if wal sender replayed all transactions from WAL log.
551-
* It can never happen if there are many active transactions.
552-
* In this case we wait until gap between sent and current position in the
553-
* WAL becomes smaller than threshold value MtmMinRecoveryLag and
554-
* after it prohibit start of new transactions until WAL is completely replayed.
555-
*/
556-
void MtmCheckRecoveryCaughtUp(int nodeId, lsn_t slotLSN)
557-
{
558-
MtmLock(LW_EXCLUSIVE);
559-
if (MtmIsRecoveredNode(nodeId)) {
560-
lsn_t walLSN = GetXLogInsertRecPtr();
561-
if (slotLSN + (long64) MtmMinRecoveryLag * 1024 > walLSN)
562-
{
563-
/*
564-
* Wal sender almost caught up.
565-
* Lock cluster preventing new transaction to start until wal is completely replayed.
566-
* We have to maintain two bitmasks: one is marking wal sender, another - correspondent nodes.
567-
* Is there some better way to establish mapping between nodes ad WAL-seconder?
568-
*/
569-
MTM_LOG1("Node %d is almost caught-up: slot position %llx, WAL position %llx",
570-
nodeId, slotLSN, walLSN);
571-
572-
MTM_LOG1("[LOCK] set lock on MtmCheckRecoveryCaughtUp");
573-
} else {
574-
MTM_LOG2("Continue recovery of node %d, slot position %llx, WAL position %llx,"
575-
" WAL sender position %llx, lockers %llx",
576-
nodeId, (long long unsigned int) slotLSN,
577-
(long long unsigned int) walLSN,
578-
(long long unsigned int) MyWalSnd->sentPtr,
579-
Mtm->originLockNodeMask);
580-
}
581-
}
582-
MtmUnlock();
583-
}
584-
585-
/*
586-
* Notification about node recovery completion.
587-
* If recovery is in progress and WAL sender replays all records in WAL,
588-
* then enable recovered node and send notification to it about end of recovery.
589-
*/
590-
bool MtmRecoveryCaughtUp(int nodeId, lsn_t walEndPtr)
591-
{
592-
bool caughtUp = false;
593-
MtmLock(LW_EXCLUSIVE);
594-
if (MtmIsRecoveredNode(nodeId))
595-
{
596-
MtmStateProcessNeighborEvent(nodeId, MTM_NEIGHBOR_RECOVERY_CAUGHTUP, true);
597-
caughtUp = true;
598-
MtmIsRecoverySession = false;
599-
}
600-
MtmUnlock();
601-
return caughtUp;
602-
}
603-
604-
605531
/*
606532
* -------------------------------------------
607533
* Node initialization
@@ -1492,130 +1418,6 @@ void MtmStopNode(int nodeId, bool dropSlot)
14921418
}
14931419
}
14941420

1495-
static void
1496-
MtmOnProcExit(int code, Datum arg)
1497-
{
1498-
if (MtmReplicationNodeId > 0) {
1499-
Mtm->nodes[MtmReplicationNodeId-1].senderPid = -1;
1500-
MTM_LOG1("WAL-sender to %d is terminated", MtmReplicationNodeId);
1501-
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
1502-
}
1503-
}
1504-
1505-
static void
1506-
MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1507-
{
1508-
ListCell *param;
1509-
bool recoveryCompleted = false;
1510-
ulong64 recoveryStartPos = INVALID_LSN;
1511-
int i;
1512-
1513-
MtmIsRecoverySession = false;
1514-
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
1515-
1516-
foreach(param, args->in_params)
1517-
{
1518-
DefElem *elem = lfirst(param);
1519-
if (strcmp("mtm_replication_mode", elem->defname) == 0) {
1520-
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
1521-
if (strcmp(strVal(elem->arg), "recovery") == 0) {
1522-
MtmIsRecoverySession = true;
1523-
} else if (strcmp(strVal(elem->arg), "recovered") == 0) {
1524-
recoveryCompleted = true;
1525-
} else if (strcmp(strVal(elem->arg), "open_existed") != 0 && strcmp(strVal(elem->arg), "create_new") != 0) {
1526-
MTM_ELOG(ERROR, "Illegal recovery mode %s", strVal(elem->arg));
1527-
}
1528-
} else {
1529-
MTM_ELOG(ERROR, "Replication mode is not specified");
1530-
}
1531-
} else if (strcmp("mtm_restart_pos", elem->defname) == 0) {
1532-
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
1533-
sscanf(strVal(elem->arg), "%llx", &recoveryStartPos);
1534-
} else {
1535-
MTM_ELOG(ERROR, "Restart position is not specified");
1536-
}
1537-
} else if (strcmp("mtm_recovered_pos", elem->defname) == 0) {
1538-
if (elem->arg != NULL && strVal(elem->arg) != NULL) {
1539-
ulong64 recoveredLSN;
1540-
sscanf(strVal(elem->arg), "%llx", &recoveredLSN);
1541-
MTM_LOG1("Recovered position of node %d is %llx", MtmReplicationNodeId, recoveredLSN);
1542-
// if (Mtm->nodes[MtmReplicationNodeId-1].restartLSN < recoveredLSN) {
1543-
// MTM_LOG1("Advance restartLSN for node %d from %llx to %llx (MtmReplicationStartupHook)",
1544-
// MtmReplicationNodeId, Mtm->nodes[MtmReplicationNodeId-1].restartLSN, recoveredLSN);
1545-
// // Assert(Mtm->nodes[MtmReplicationNodeId-1].restartLSN == INVALID_LSN
1546-
// // || recoveredLSN < Mtm->nodes[MtmReplicationNodeId-1].restartLSN + MtmMaxRecoveryLag);
1547-
// Mtm->nodes[MtmReplicationNodeId-1].restartLSN = recoveredLSN;
1548-
// }
1549-
} else {
1550-
MTM_ELOG(ERROR, "Recovered position is not specified");
1551-
}
1552-
}
1553-
}
1554-
MTM_LOG1("Startup of logical replication to node %d", MtmReplicationNodeId);
1555-
MtmLock(LW_EXCLUSIVE);
1556-
1557-
/*
1558-
* Set proper originId mappings.
1559-
*
1560-
* This is copypasted from receiver. Better to have normal init method
1561-
* to setup all stuff in shared memory. But seems that there is no such
1562-
* callback in vanilla pg and adding one will require some carefull thoughts.
1563-
*/
1564-
for (i = 0; i < Mtm->nAllNodes; i++)
1565-
{
1566-
char *originName;
1567-
RepOriginId originId;
1568-
1569-
originName = psprintf(MULTIMASTER_SLOT_PATTERN, i + 1);
1570-
originId = replorigin_by_name(originName, true);
1571-
if (originId == InvalidRepOriginId) {
1572-
originId = replorigin_create(originName);
1573-
}
1574-
CommitTransactionCommand();
1575-
StartTransactionCommand();
1576-
Mtm->nodes[i].originId = originId;
1577-
}
1578-
1579-
if (BIT_CHECK(Mtm->stalledNodeMask, MtmReplicationNodeId-1)) {
1580-
MtmUnlock();
1581-
MTM_ELOG(ERROR, "Stalled node %d tries to initiate recovery", MtmReplicationNodeId);
1582-
}
1583-
1584-
if (BIT_CHECK(Mtm->stoppedNodeMask, MtmReplicationNodeId-1)) {
1585-
MtmUnlock();
1586-
MTM_ELOG(ERROR, "Stopped node %d tries to connect", MtmReplicationNodeId);
1587-
}
1588-
1589-
if (!BIT_CHECK(Mtm->clique, MtmReplicationNodeId-1)) {
1590-
MtmUnlock();
1591-
MTM_ELOG(ERROR, "Out-of-clique node %d tries to connect", MtmReplicationNodeId);
1592-
}
1593-
1594-
if (MtmIsRecoverySession) {
1595-
MTM_LOG1("%d: Node %d start recovery of node %d at position %llx", MyProcPid, MtmNodeId, MtmReplicationNodeId, recoveryStartPos);
1596-
Assert(MyReplicationSlot != NULL);
1597-
if (recoveryStartPos < MyReplicationSlot->data.restart_lsn) {
1598-
MTM_ELOG(WARNING, "Specified recovery start position %llx is beyond restart lsn %llx", recoveryStartPos, (long64)MyReplicationSlot->data.restart_lsn);
1599-
}
1600-
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERY, true);
1601-
} else { //if (BIT_CHECK(Mtm->disabledNodeMask, MtmReplicationNodeId-1)) {
1602-
if (recoveryCompleted) {
1603-
MTM_LOG1("Node %d consider that recovery of node %d is completed: start normal replication", MtmNodeId, MtmReplicationNodeId);
1604-
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERED, true);
1605-
} else {
1606-
/* Force arbiter to reestablish connection with this node, send heartbeat to inform this node that it was disabled and should perform recovery */
1607-
MtmUnlock();
1608-
MTM_ELOG(ERROR, "Disabled node %d tries to reconnect without recovery", MtmReplicationNodeId);
1609-
}
1610-
}
1611-
// else {
1612-
// // MTM_LOG1("Node %d start logical replication to node %d in normal mode", MtmNodeId, MtmReplicationNodeId);
1613-
// MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_NORMAL);
1614-
// }
1615-
1616-
MtmUnlock();
1617-
on_shmem_exit(MtmOnProcExit, 0);
1618-
}
16191421

16201422
lsn_t MtmGetFlushPosition(int nodeId)
16211423
{
@@ -1666,74 +1468,6 @@ void MtmUpdateLsnMapping(int node_id, lsn_t end_lsn)
16661468
MemoryContextSwitchTo(old_context);
16671469
}
16681470

1669-
1670-
static void
1671-
MtmReplicationShutdownHook(struct PGLogicalShutdownHookArgs* args)
1672-
{
1673-
MtmLock(LW_EXCLUSIVE);
1674-
if (MtmReplicationNodeId >= 0 && BIT_CHECK(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1)) {
1675-
BIT_CLEAR(Mtm->pglogicalSenderMask, MtmReplicationNodeId-1);
1676-
MTM_LOG1("Logical replication to node %d is stopped", MtmReplicationNodeId);
1677-
/* MtmOnNodeDisconnect(MtmReplicationNodeId); */
1678-
MtmReplicationNodeId = -1; /* defuse MtmOnProcExit hook */
1679-
}
1680-
MtmUnlock();
1681-
}
1682-
1683-
/*
1684-
* Filter transactions which should be replicated to other nodes.
1685-
* This filter is applied at sender side (WAL sender).
1686-
* Final filtering is also done at destination side by MtmFilterTransaction function.
1687-
*/
1688-
static bool
1689-
MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1690-
{
1691-
/* Do not replicate any transactions in recovery mode (because we should apply
1692-
* changes sent to us rather than send our own pending changes)
1693-
* and transactions received from other nodes
1694-
* (originId should be non-zero in this case)
1695-
* unless we are performing recovery of disabled node
1696-
* (in this case all transactions should be sent)
1697-
*/
1698-
/*
1699-
* I removed (Mtm->status != MTM_RECOVERY) here since in major
1700-
* mode we need to recover from offline node too. Also it seems
1701-
* that with amount of nodes >= 3 we also need that. --sk
1702-
*
1703-
* On a first look this works fine.
1704-
*/
1705-
bool res = (args->origin_id == InvalidRepOriginId
1706-
|| MtmIsRecoveredNode(MtmReplicationNodeId));
1707-
if (!res) {
1708-
MTM_LOG2("Filter transaction with origin_id=%d", args->origin_id);
1709-
}
1710-
return res;
1711-
}
1712-
1713-
/*
1714-
* Filter record corresponding to local (non-distributed) tables
1715-
*/
1716-
static bool
1717-
MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
1718-
{
1719-
bool isDistributed;
1720-
1721-
/*
1722-
* We have several built-in local tables that shouldn't be replicated.
1723-
* It is hard to insert them into MtmLocalTables properly on extension
1724-
* creation so we just list them here.
1725-
*/
1726-
if (strcmp(args->changed_rel->rd_rel->relname.data, "referee_decision") == 0)
1727-
return false;
1728-
1729-
/*
1730-
* Check in shared hash of local tables.
1731-
*/
1732-
isDistributed = !MtmIsRelationLocal(args->changed_rel);
1733-
1734-
return isDistributed;
1735-
}
1736-
17371471
/*
17381472
* Filter received transactions at destination side.
17391473
* This function is executed by receiver,
@@ -1817,14 +1551,6 @@ bool MtmFilterTransaction(char* record, int size)
18171551
return duplicate;
18181552
}
18191553

1820-
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
1821-
{
1822-
hooks->startup_hook = MtmReplicationStartupHook;
1823-
hooks->shutdown_hook = MtmReplicationShutdownHook;
1824-
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
1825-
hooks->row_filter_hook = MtmReplicationRowFilterHook;
1826-
}
1827-
18281554
/*
18291555
* Setup replication session origin to include origin location in WAL and
18301556
* update slot position.

src/pglogical_apply.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -775,7 +775,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
775775
} else {
776776
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
777777
}
778-
mtm_log(MtmTxFinish, "TXFINISH: %s precommittted", gid);
778+
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", gid);
779779

780780
// MtmPrecommitTransaction(gid);
781781

0 commit comments

Comments
 (0)