Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6f0d2c9

Browse files
committed
log ddl at destination too; use unique session_id in parallel-safe message
1 parent fbb98ce commit 6f0d2c9

11 files changed

+109
-30
lines changed

src/ddl.c

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "access/relscan.h"
3636
#include "commands/vacuum.h"
3737
#include "utils/inval.h"
38+
#include "replication/origin.h"
3839
#include "miscadmin.h"
3940

4041
#include "mm.h"
@@ -1048,10 +1049,15 @@ MtmExecutorFinish(QueryDesc *queryDesc)
10481049

10491050

10501051
void
1051-
MtmApplyDDLMessage(const char *messageBody)
1052+
MtmApplyDDLMessage(const char *messageBody, bool transactional)
10521053
{
10531054
int rc;
10541055

1056+
/* Write DDL to our WAL in case smbd going to recover from us */
1057+
Assert(replorigin_session_origin != InvalidRepOriginId);
1058+
LogLogicalMessage(transactional ? "D" : "C",
1059+
messageBody, strlen(messageBody) + 1, transactional);
1060+
10551061
mtm_log(DMLStmtIncoming, "%d: Executing utility statement %s",
10561062
MyProcPid, messageBody);
10571063

@@ -1083,6 +1089,8 @@ MtmApplyDDLMessage(const char *messageBody)
10831089
MemoryContextSwitchTo(MtmApplyContext);
10841090
PushActiveSnapshot(GetTransactionSnapshot());
10851091

1092+
// XXX: assert that was non-transactional ddl
1093+
10861094
switch (nodeTag(MtmCapturedDDL))
10871095
{
10881096
case T_VacuumStmt:

src/include/ddl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ extern void MtmDDLReplicationInit(void);
2323
extern void MtmDDLReplicationShmemStartup(void);
2424
extern bool MtmIsRelationLocal(Relation rel);
2525
extern void MtmDDLResetStatement(void);
26-
extern void MtmApplyDDLMessage(const char *messageBody);
26+
extern void MtmApplyDDLMessage(const char *messageBody, bool transactional);
2727
extern void MtmDDLResetApplyState(void);
2828
extern void MtmSetRemoteFunction(char const* list, void* extra);
2929
extern void MtmToggleDML(void);

src/include/mm.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ typedef struct
5050
int node_id;
5151
bool is_recovery;
5252
bool parallel_allowed;
53+
TimestampTz session_id;
5354
} MtmReceiverContext;
5455

5556
/* XXX: drop that */

src/include/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,6 @@ extern void MtmUpdateControlFile(void);
193193

194194
extern void MtmCheckSlots(void);
195195

196+
extern TimestampTz MtmGetIncreasingTimestamp(void);
197+
196198
#endif

src/include/pglogical_output.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ typedef struct MtmDecoderPrivate
4545
{
4646
int magic; // XXX
4747
bool is_recovery;
48-
int node_id;
48+
int64 session_id;
4949
} MtmDecoderPrivate;
5050

5151
typedef struct PGLogicalOutputData

src/multimaster.c

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ typedef enum
4646
MTM_STATE_LOCK_ID
4747
} MtmLockIds;
4848

49+
50+
typedef struct
51+
{
52+
TimestampTz last_timestamp;
53+
slock_t mutex;
54+
} MtmTime;
55+
56+
static MtmTime *mtm_time;
57+
4958
#define Natts_mtm_nodes_state 17
5059
#define Natts_mtm_cluster_state 20
5160
typedef struct
@@ -201,6 +210,24 @@ void MtmUnlockNode(int nodeId)
201210
* -------------------------------------------
202211
*/
203212

213+
TimestampTz
214+
MtmGetIncreasingTimestamp()
215+
{
216+
TimestampTz now = GetCurrentTimestamp();
217+
218+
/*
219+
* Don't let time move backward; if it hasn't advanced, use incremented
220+
* last value.
221+
*/
222+
SpinLockAcquire(&mtm_time->mutex);
223+
if (now <= mtm_time->last_timestamp)
224+
now = ++mtm_time->last_timestamp;
225+
else
226+
mtm_time->last_timestamp = now;
227+
SpinLockRelease(&mtm_time->mutex);
228+
229+
return now;
230+
}
204231

205232
timestamp_t MtmGetSystemTime(void)
206233
{
@@ -524,6 +551,14 @@ static void MtmInitialize()
524551
int i;
525552

526553
LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
554+
555+
mtm_time = (MtmTime *) ShmemInitStruct("mtm_time", sizeof(MtmTime), &found);
556+
if (!found)
557+
{
558+
mtm_time->last_timestamp = 0;
559+
SpinLockInit(&mtm_time->mutex);
560+
}
561+
527562
Mtm = (MtmState*)ShmemInitStruct(MULTIMASTER_NAME, sizeof(MtmState) + sizeof(MtmNodeInfo)*(MtmMaxNodes-1), &found);
528563
if (!found)
529564
{
@@ -1152,7 +1187,7 @@ _PG_init(void)
11521187
* the postmaster process.) We'll allocate or attach to the shared
11531188
* resources in mtm_shmem_startup().
11541189
*/
1155-
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize);
1190+
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize + sizeof(MtmTime));
11561191
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2 + 1);
11571192

11581193
MtmMonitorInitialize();

src/pglogical_apply.c

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,8 @@ process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
502502
char const* messageBody = pq_getmsgbytes(s, messageSize);
503503
bool standalone = false;
504504

505+
MtmBeginSession(MtmReplicationNodeId);
506+
505507
switch (action)
506508
{
507509
case 'C':
@@ -510,15 +512,15 @@ process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
510512
SetCurrentStatementStartTimestamp();
511513
MtmResetTransaction();
512514
StartTransactionCommand();
513-
MtmApplyDDLMessage(messageBody);
515+
MtmApplyDDLMessage(messageBody, false);
514516
CommitTransactionCommand(); // XXX
515517
standalone = true;
516518
break;
517519
}
518520
case 'D':
519521
{
520522
mtm_log(MtmApplyMessage, "Executing tx DDL message %s", messageBody);
521-
MtmApplyDDLMessage(messageBody);
523+
MtmApplyDDLMessage(messageBody, true);
522524
break;
523525
}
524526
case 'L':
@@ -530,19 +532,19 @@ process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
530532
}
531533
case 'P':
532534
{
533-
int node_id = -1;
535+
int64 session_id = 0;
534536

535-
sscanf(messageBody, "%d", &node_id);
537+
sscanf(messageBody, INT64_FORMAT, &session_id);
536538

537-
Assert(node_id > 0);
539+
Assert(session_id > 0);
538540
// XXX assert that it is receiver itself
539541

540-
if (!receiver_ctx->is_recovery && node_id == MtmNodeId)
542+
if (receiver_ctx->session_id == session_id)
541543
{
544+
Assert(!receiver_ctx->is_recovery);
542545
Assert(!receiver_ctx->parallel_allowed);
543546
Assert(receiver_ctx->node_id > 0);
544547
Assert(receiver_ctx->node_id == MtmReplicationNodeId);
545-
Assert(receiver_ctx->node_id != node_id);
546548

547549
receiver_ctx->parallel_allowed = true;
548550
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_RECEIVER_START, false);
@@ -553,6 +555,9 @@ process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
553555
default:
554556
Assert(false);
555557
}
558+
559+
MtmEndSession(MtmReplicationNodeId, false);
560+
556561
return standalone;
557562
}
558563

src/pglogical_proto.c

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
707707

708708
hooks_data = (MtmDecoderPrivate *) palloc0(sizeof(MtmDecoderPrivate));
709709
args->private_data = hooks_data;
710-
hooks_data->magic = 42042042;
710+
hooks_data->session_id = 0;
711711

712712
Mtm->nodes[MtmReplicationNodeId-1].senderPid = MyProcPid;
713713

@@ -726,10 +726,8 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
726726
{
727727
hooks_data->is_recovery = false;
728728
}
729-
else if (strcmp(strVal(elem->arg), "open_existed") != 0 &&
730-
strcmp(strVal(elem->arg), "create_new") != 0)
729+
else
731730
{
732-
// XXX: don't need that anymore
733731
mtm_log(ERROR, "Illegal recovery mode %s", strVal(elem->arg));
734732
}
735733
}
@@ -738,8 +736,28 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
738736
mtm_log(ERROR, "Replication mode is not specified");
739737
}
740738
}
739+
else if (strcmp("mtm_session_id", elem->defname) == 0)
740+
{
741+
if (elem->arg != NULL && strVal(elem->arg) != NULL)
742+
{
743+
int64 session_id = 0;
744+
sscanf(strVal(elem->arg), INT64_FORMAT, &session_id);
745+
746+
if (session_id == 0)
747+
mtm_log(ERROR, "Illegal mtm_session_id");
748+
749+
hooks_data->session_id = session_id;
750+
}
751+
else
752+
{
753+
mtm_log(ERROR, "mtm_session_id is not specified");
754+
}
755+
}
741756
}
742757

758+
if (hooks_data->session_id == 0)
759+
mtm_log(ERROR, "mtm_session_id is not specified");
760+
743761
/*
744762
* Set proper originId mappings.
745763
*
@@ -814,11 +832,11 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
814832
if (!hooks_data->is_recovery)
815833
{
816834
XLogRecPtr msg_xptr;
817-
char *dest_id = psprintf("%d", MtmReplicationNodeId);
835+
char *session_id = psprintf(INT64_FORMAT, hooks_data->session_id);
818836

819837
LWLockAcquire(MtmCommitBarrier, LW_EXCLUSIVE);
820838
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERED, false);
821-
msg_xptr = LogLogicalMessage("P", dest_id, strlen(dest_id) + 1, false);
839+
msg_xptr = LogLogicalMessage("P", session_id, strlen(session_id) + 1, false);
822840
LWLockRelease(MtmCommitBarrier);
823841

824842
XLogFlush(msg_xptr);

src/pglogical_receiver.c

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ pglogical_receiver_main(Datum main_arg)
446446
PGconn *conn;
447447
PGresult *res;
448448
MtmReplicationMode mode;
449-
MtmReceiverContext receiver_ctx = {nodeId, false, false};
449+
MtmReceiverContext receiver_ctx = {nodeId, false, false, 0};
450450

451451
ByteBuffer buf;
452452
/* Buffer for COPY data */
@@ -590,14 +590,24 @@ pglogical_receiver_main(Datum main_arg)
590590
Mtm->nodes[nodeId-1].manualRecovery = false;
591591
}
592592

593-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x (\"startup_params_format\" '1', \"max_proto_version\" '%d', \"min_proto_version\" '%d', \"forward_changesets\" '1', \"mtm_replication_mode\" '%s', \"mtm_restart_pos\" '%llx')",
593+
receiver_ctx.session_id = MtmGetIncreasingTimestamp();
594+
receiver_ctx.is_recovery = mode == REPLMODE_RECOVERY;
595+
receiver_ctx.parallel_allowed = false;
596+
597+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %x/%x ("
598+
"\"startup_params_format\" '1',"
599+
"\"max_proto_version\" '%d',"
600+
"\"min_proto_version\" '%d',"
601+
"\"forward_changesets\" '1',"
602+
"\"mtm_replication_mode\" '%s',"
603+
"\"mtm_session_id\" '"INT64_FORMAT"')",
594604
slotName,
595605
(uint32) (originStartPos >> 32),
596606
(uint32) originStartPos,
597607
MULTIMASTER_MAX_PROTO_VERSION,
598608
MULTIMASTER_MIN_PROTO_VERSION,
599609
MtmReplicationModeName[mode],
600-
originStartPos
610+
receiver_ctx.session_id
601611
);
602612
res = PQexec(conn, query->data);
603613
if (PQresultStatus(res) != PGRES_COPY_BOTH)
@@ -608,9 +618,6 @@ pglogical_receiver_main(Datum main_arg)
608618
PQclear(res);
609619
resetPQExpBuffer(query);
610620

611-
receiver_ctx.is_recovery = mode == REPLMODE_RECOVERY;
612-
receiver_ctx.parallel_allowed = false;
613-
614621
for(;;)
615622
{
616623
int rc, hdr_len;

tests_testgres/ddl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ class TestDDL(unittest.TestCase):
1212
def setUpClass(cls):
1313
cls.cluster = Cluster(NUM_NODES)
1414
cls.cluster.print_conninfo()
15-
cls.cluster.start().await_online((0,1,2))
15+
cls.cluster.start().install().await_online((0,1,2))
1616
# cls.cluster.print_conninfo()
1717

1818
@classmethod

tests_testgres/mm_cluster.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
# Execute this file in order to run 3-node mm cluster
77

88
import time
9+
import os
910

1011
from testgres import PostgresNode, NodeStatus, NodeBackup
1112
from testgres import reserve_port, release_port
@@ -135,11 +136,10 @@ def print_conninfo(self):
135136
self.username))
136137

137138
def install(self):
138-
self.node_any().poll_query_until(dbname=self.dbname,
139-
username=self.username,
140-
query="create extension multimaster",
141-
raise_programming_error=False,
142-
expected=None)
139+
for node in self.nodes:
140+
node.safe_psql("create extension multimaster;",
141+
dbname=self.dbname,
142+
username=self.username)
143143

144144
return self
145145

@@ -262,9 +262,12 @@ def __init__(self,
262262
use_logging=False,
263263
master=None):
264264

265+
if base_dir is None:
266+
base_dir = os.path.dirname(os.path.abspath(__file__))
267+
265268
super(ClusterNode, self).__init__(name=name,
266269
port=pg_port,
267-
base_dir=base_dir)
270+
base_dir=base_dir + '/../tmp_check/' + name)
268271

269272
self.mm_port = mm_port
270273

0 commit comments

Comments
 (0)