Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ca43925

Browse files
committed
Intoduce syncpoints instead of replication session
1 parent fb411c8 commit ca43925

17 files changed

+902
-73
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ OBJS = src/multimaster.o src/dmq.o src/commit.o src/bytebuf.o src/bgwpool.o \
44
src/pglogical_output.o src/pglogical_proto.o src/pglogical_receiver.o \
55
src/pglogical_apply.o src/pglogical_hooks.o src/pglogical_config.o \
66
src/pglogical_relid_map.o src/ddd.o src/bkb.o src/spill.o src/state.o \
7-
src/resolver.o src/ddl.o
7+
src/resolver.o src/ddl.o src/syncpoint.o
88
MODULE_big = multimaster
99

1010
ifdef USE_PGXS

multimaster--1.0.sql

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,23 @@ CREATE FUNCTION mtm.check_deadlock(xid bigint) RETURNS boolean
137137
AS 'MODULE_PATHNAME','mtm_check_deadlock'
138138
LANGUAGE C;
139139

140-
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema name, rel_name name, primary key(rel_schema, rel_name));
140+
CREATE TABLE mtm.local_tables(
141+
rel_schema name,
142+
rel_name name,
143+
primary key(rel_schema, rel_name)
144+
);
145+
146+
CREATE TABLE mtm.referee_decision(
147+
key text primary key not null,
148+
node_id int
149+
);
141150

142-
CREATE TABLE mtm.referee_decision(key text primary key not null, node_id int);
151+
CREATE TABLE mtm.syncpoints(
152+
node_id int not null,
153+
origin_lsn bigint not null,
154+
local_lsn bigint not null,
155+
primary key(node_id, origin_lsn)
156+
);
143157

144158
CREATE OR REPLACE FUNCTION mtm.alter_sequences() RETURNS boolean AS
145159
$$

src/bgwpool.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
#include "mm.h"
1919
#include "logger.h"
2020

21+
bool MtmIsPoolWorker;
22+
2123
bool MtmIsLogicalReceiver;
2224
int MtmMaxWorkers;
2325

@@ -50,6 +52,9 @@ BgwPoolMainLoop(BgwPool* pool)
5052

5153
mtm_log(BgwPoolEvent, "Start background worker %d, shutdown=%d", MyProcPid, pool->shutdown);
5254

55+
MtmIsPoolWorker = true;
56+
57+
// XXX: get rid of that
5358
MtmBackgroundWorker = true;
5459
MtmIsLogicalReceiver = true;
5560
MtmPool = pool;
@@ -74,6 +79,8 @@ BgwPoolMainLoop(BgwPool* pool)
7479
}
7580

7681
PGSemaphoreLock(pool->available);
82+
83+
// XXX: change to LWLock
7784
SpinLockAcquire(&pool->lock);
7885
if (pool->shutdown)
7986
{
@@ -123,6 +130,8 @@ BgwPoolMainLoop(BgwPool* pool)
123130
pool->active -= 1;
124131
pool->lastPeakTime = 0;
125132
SpinLockRelease(&pool->lock);
133+
134+
ConditionVariableBroadcast(&pool->syncpoint_cv);
126135
}
127136

128137
SpinLockRelease(&pool->lock);
@@ -144,6 +153,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
144153
PGSemaphoreReset(pool->available);
145154
PGSemaphoreReset(pool->overflow);
146155
SpinLockInit(&pool->lock);
156+
ConditionVariableInit(&pool->syncpoint_cv);
147157
pool->shutdown = false;
148158
pool->producerBlocked = false;
149159
pool->head = 0;

src/commit.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "logger.h"
2323
#include "ddl.h"
2424
#include "state.h"
25+
#include "syncpoint.h"
2526

2627
static Oid MtmDatabaseId;
2728
static bool DmqSubscribed;
@@ -210,6 +211,8 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
210211
dmq_stream_unsubscribe(stream);
211212
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);
212213

214+
MaybeLogSyncpoint();
215+
213216
return true;
214217
}
215218

src/include/bgwpool.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "storage/spin.h"
66
#include "storage/pg_sema.h"
77
#include "postmaster/bgworker.h"
8+
#include "storage/condition_variable.h"
89
#include "bkb.h" // XXX
910

1011
#include "mm.h"
@@ -30,6 +31,7 @@ typedef struct
3031
volatile slock_t lock;
3132
PGSemaphore available;
3233
PGSemaphore overflow;
34+
ConditionVariable syncpoint_cv;
3335
size_t head;
3436
size_t tail;
3537
size_t size;

src/include/logger.h

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,20 @@ typedef enum MtmLogTag
6060
ProtoTraceState = LOG,
6161

6262
/* receiver */
63-
MtmReceiverMode = LOG,
64-
MtmReceiverFilter = LOG,
63+
MtmReceiverStart = LOG,
64+
MtmReceiverFilter = DEBUG2,
6565
MtmApplyMessage = LOG,
6666
MtmApplyTrace = DEBUG2,
6767
MtmApplyError = LOG,
6868
MtmApplyBgwFinish = LOG,
6969

7070
/* state */
7171
MtmStateSwitch = LOG,
72-
MtmStateMessage = LOG
72+
MtmStateMessage = LOG,
73+
74+
/* syncpoints */
75+
SyncpointCreated = LOG,
76+
SyncpointApply = LOG,
7377
} MtmLogTag;
7478

7579
// XXX: also meaningful process name would be cool

src/include/mm.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ typedef struct
5151
bool is_recovery;
5252
bool parallel_allowed;
5353
TimestampTz session_id;
54+
// XXX
55+
XLogRecPtr end_lsn;
5456
} MtmReceiverContext;
5557

5658
/* XXX: drop that */
@@ -66,6 +68,9 @@ typedef ulong64 nodemask_t;
6668
#define ALL_BITS ((nodemask_t)~0)
6769

6870
extern bool MtmIsLogicalReceiver;
71+
extern bool MtmIsReceiver;
72+
extern bool MtmIsPoolWorker;
73+
6974
extern bool MtmBackgroundWorker;
7075
extern int MtmMaxNodes;
7176
extern int MtmNodeId;

src/include/multimaster.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#define MULTIMASTER_NAME "multimaster"
1717
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
18+
#define MULTIMASTER_RECOVERY_SLOT_PATTERN "mtm_recovery_slot_%d"
1819
#define MULTIMASTER_MIN_PROTO_VERSION 1
1920
#define MULTIMASTER_MAX_PROTO_VERSION 1
2021
#define MULTIMASTER_MAX_GID_SIZE 42
@@ -27,6 +28,7 @@
2728
#define MULTIMASTER_ADMIN "mtm_admin"
2829
#define MULTIMASTER_PRECOMMITTED "precommitted"
2930
#define MULTIMASTER_PREABORTED "preaborted"
31+
#define MULTIMASTER_SYNCPOINT_INTERVAL 10*1024*1024
3032

3133
#define MULTIMASTER_DEFAULT_ARBITER_PORT 5433
3234

@@ -128,6 +130,7 @@ typedef struct
128130
bool extension_created;
129131
bool stop_new_commits;
130132
bool recovered;
133+
XLogRecPtr latestSyncpoint;
131134
MtmNodeStatus status; /* Status of this node */
132135
char *statusReason; /* A human-readable description of why the current status was set */
133136
int recoverySlot; /* NodeId of recovery slot or 0 if none */
@@ -164,6 +167,7 @@ extern char* MtmRefereeConnStr;
164167

165168
extern LWLock *MtmCommitBarrier;
166169
extern LWLock *MtmReceiverBarrier;
170+
extern LWLock *MtmSyncpointLock;
167171

168172
extern void MtmXactCallback2(XactEvent event, void *arg);
169173
extern bool MtmIsUserTransaction(void);

src/include/pglogical_proto.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2222
typedef void (*pglogical_write_begin_fn)(StringInfo out, struct PGLogicalOutputData *data,
2323
ReorderBufferTXN *txn);
2424
typedef void (*pglogical_write_message_fn)(StringInfo out, LogicalDecodingContext *ctx,
25+
XLogRecPtr end_lsn,
2526
const char *prefix, Size sz, const char *message);
2627
typedef void (*pglogical_write_commit_fn)(StringInfo out, struct PGLogicalOutputData *data,
2728
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);

src/include/syncpoint.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* syncpoint.h
4+
*
5+
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
6+
* Portions Copyright (c) 1994, Regents of the University of California
7+
*
8+
*-------------------------------------------------------------------------
9+
*/
10+
#ifndef SYNCPOINT_H
11+
#define SYNCPOINT_H
12+
13+
#include "access/xlogdefs.h"
14+
#include "libpq-fe.h"
15+
#include "utils/hsearch.h"
16+
#include "replication/walsender.h"
17+
18+
typedef struct
19+
{
20+
XLogRecPtr origin_lsn;
21+
XLogRecPtr local_lsn;
22+
} Syncpoint;
23+
24+
typedef struct
25+
{
26+
int64 node_id;
27+
XLogRecPtr origin_lsn;
28+
} FilterEntry;
29+
30+
extern void MaybeLogSyncpoint(void);
31+
extern void SyncpointRegister(int node_id, XLogRecPtr origin_lsn,
32+
XLogRecPtr local_lsn, XLogRecPtr trim_lsn);
33+
extern Syncpoint SyncpointGetLatest(int node_id);
34+
extern Syncpoint *SyncpointGetAllLatest(void);
35+
extern XLogRecPtr QueryRecoveryHorizon(PGconn *conn, int node_id, Syncpoint *local_spvector);
36+
extern HTAB *RecoveryFilterLoad(int filter_node_id, Syncpoint *spvector);
37+
38+
#endif /* SYNCPOINT_H */

src/multimaster.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ static TransactionManager MtmTM =
125125

126126
LWLock *MtmCommitBarrier;
127127
LWLock *MtmReceiverBarrier;
128+
LWLock *MtmSyncpointLock;
128129

129130
bool MtmDoReplication;
130131
char* MtmDatabaseName;
@@ -581,6 +582,7 @@ static void MtmInitialize()
581582
Mtm->pglogicalSenderMask = 0;
582583
Mtm->recoveryCount = 0;
583584
Mtm->localTablesHashLoaded = false;
585+
Mtm->latestSyncpoint = InvalidXLogRecPtr;
584586

585587
for (i = 0; i < MtmMaxNodes; i++)
586588
{
@@ -595,6 +597,9 @@ static void MtmInitialize()
595597
}
596598
Mtm->nodes[MtmNodeId-1].originId = DoNotReplicateId;
597599
/* All transaction originated from the current node should be ignored during recovery */
600+
601+
602+
// XXX: not used anymore
598603
Mtm->nodes[MtmNodeId-1].restartLSN = (lsn_t)PG_UINT64_MAX;
599604

600605
MtmTx.xid = InvalidTransactionId;
@@ -604,6 +609,7 @@ static void MtmInitialize()
604609

605610
MtmCommitBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[MtmMaxNodes*2+1].lock);
606611
MtmReceiverBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[MtmMaxNodes*2+2].lock);
612+
MtmSyncpointLock = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[MtmMaxNodes*2+3].lock);
607613

608614
MtmDoReplication = true;
609615
TM = &MtmTM;
@@ -1191,7 +1197,7 @@ _PG_init(void)
11911197
* resources in mtm_shmem_startup().
11921198
*/
11931199
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize + sizeof(MtmTime));
1194-
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2 + 2);
1200+
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2 + 3);
11951201

11961202
MtmMonitorInitialize();
11971203

0 commit comments

Comments
 (0)