Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f695a4e

Browse files
committed
Allow parallel apply at receiver only for txes that include receiver
node as a participant. Otherwise receiver may reorder prepares and corresponding commits as coordinator doesn't wait for confirmation after prepare from this receiver. As a consequence we don't need to rely on Mtm->state in MtmExecute.
1 parent 7bdaa1f commit f695a4e

15 files changed

+132
-31
lines changed

src/bgwpool.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
9393
pool->lastPeakTime = 0;
9494
}
9595
SpinLockRelease(&pool->lock);
96-
pool->executor(work, size);
96+
pool->executor(work, size, NULL);
9797
pfree(work);
9898
SpinLockAcquire(&pool->lock);
9999
pool->active -= 1;
@@ -209,7 +209,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
209209
* Size of work is larger than size of shared buffer:
210210
* run it immediately
211211
*/
212-
pool->executor(work, size);
212+
pool->executor(work, size, NULL);
213213
return;
214214
}
215215

src/commit.c

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
150150

151151
x->xid = xid;
152152

153+
/*
154+
* This lock is taken for a quite a long period of time but normally
155+
* all callers lock it in shared mode, so it shouldn't be noticeable
156+
* performance-wise.
157+
*
158+
* It is only used during startup of WalSender(node_id) in recovered mode
159+
* to create a barrier after which all transactions doing our 3PC are
160+
* guaranted to have seen participantsMask with node_id enabled, so the
161+
* receiver can apply them in parallel and be sure that precommit will
162+
* not happens before node_id applies prepare.
163+
*
164+
* See also comments at the end of MtmReplicationStartupHook().
165+
*/
166+
LWLockAcquire(MtmCommitBarrier, LW_SHARED);
167+
153168
MtmLock(LW_SHARED);
154169
participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) &
155170
~Mtm->disabledNodeMask &
@@ -187,6 +202,8 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
187202
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
188203
GatherPrecommits(x, participantsMask, MSG_COMMITTED);
189204

205+
LWLockRelease(MtmCommitBarrier);
206+
190207
dmq_stream_unsubscribe(stream);
191208
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);
192209

src/dmq.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -856,6 +856,8 @@ _pq_getmessage_if_avalable(StringInfo s)
856856
recv_bytes += rc;
857857
Assert(recv_bytes >= read_bytes && recv_bytes <= DMQ_RECV_BUFFER);
858858

859+
mtm_log(DmqTraceIncoming, "dmq: got %d bytes", rc);
860+
859861
/*
860862
* Here we need to re-check for full message again, so the caller will know
861863
* whether he should wait for event on socket.
@@ -881,7 +883,7 @@ _pq_getbyte_if_available(unsigned char *c)
881883
int rc;
882884

883885
/*
884-
* That is why we re-implementing this function: byte ccan be already in
886+
* That is why we re-implementing this function: byte can be already in
885887
* our recv buffer, so pqcomm version will miss it.
886888
*/
887889
if (recv_bytes > read_bytes)

src/include/bgwpool.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
#include "storage/s_lock.h"
55
#include "storage/spin.h"
66
#include "storage/pg_sema.h"
7-
#include "bkb.h"
7+
#include "bkb.h" // XXX
88

9-
typedef void(*BgwPoolExecutor)(void* work, size_t size);
9+
#include "mm.h"
10+
11+
typedef void(*BgwPoolExecutor)(void* work, size_t size, MtmReceiverContext *ctx);
1012

1113
typedef long timestamp_t;
1214

src/include/mm.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ typedef struct MtmSeqPosition
4545
int64 next;
4646
} MtmSeqPosition;
4747

48+
typedef struct
49+
{
50+
int node_id;
51+
bool is_recovery;
52+
bool parallel_allowed;
53+
} MtmReceiverContext;
54+
4855
/* XXX: drop that */
4956
typedef long long long64; /* we are not using int64 here because we want to use %lld format for this type */
5057
typedef unsigned long long ulong64; /* we are not using uint64 here because we want to use %lld format for this type */

src/include/multimaster.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ extern bool MtmMajorNode;
160160
extern bool MtmBackgroundWorker;
161161
extern char* MtmRefereeConnStr;
162162

163+
extern LWLock *MtmCommitBarrier;
164+
163165
extern void MtmXactCallback2(XactEvent event, void *arg);
164166
extern bool MtmIsUserTransaction(void);
165167
extern void MtmGenerateGid(char *gid, TransactionId xid);

src/include/pglogical_output.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ typedef struct HookFuncName
4343

4444
typedef struct MtmDecoderPrivate
4545
{
46-
int magic;
47-
bool is_recovery;
46+
int magic; // XXX
47+
bool is_recovery;
48+
int node_id;
4849
} MtmDecoderPrivate;
4950

5051
typedef struct PGLogicalOutputData

src/include/receiver.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
#ifndef MTM_RECEIVER_H
2+
#define MTM_RECEIVER_H
3+
14
#include "multimaster.h"
25
#include "mm.h"
36

47
extern void MtmStartReceivers(void);
58
extern void MtmStartReceiver(int nodeId, bool dynamic);
69

7-
extern void MtmExecutor(void* work, size_t size);
10+
extern void MtmExecutor(void* work, size_t size, MtmReceiverContext *rctx);
811
extern void MtmUpdateLsnMapping(int node_id, lsn_t end_lsn);
912

1013
extern void MtmBeginSession(int nodeId);
1114
extern void MtmEndSession(int nodeId, bool unlock);
15+
16+
#endif

src/multimaster.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ static TransactionManager MtmTM =
114114
MtmResumeTransaction
115115
};
116116

117+
LWLock *MtmCommitBarrier;
118+
117119
bool MtmDoReplication;
118120
char* MtmDatabaseName;
119121
char* MtmDatabaseUser;
@@ -530,7 +532,6 @@ static void MtmInitialize()
530532
Mtm->status = MTM_DISABLED; //MTM_INITIALIZATION;
531533
Mtm->recoverySlot = 0;
532534
Mtm->locks = GetNamedLWLockTranche(MULTIMASTER_NAME);
533-
534535
Mtm->nAllNodes = MtmNodes;
535536
Mtm->disabledNodeMask = (((nodemask_t)1 << MtmNodes) - 1);
536537
Mtm->clique = (((nodemask_t)1 << Mtm->nAllNodes) - 1); //0;
@@ -563,6 +564,8 @@ static void MtmInitialize()
563564

564565
RegisterXactCallback(MtmXactCallback2, NULL);
565566

567+
MtmCommitBarrier = &(GetNamedLWLockTranche(MULTIMASTER_NAME)[MtmMaxNodes*2+1].lock);
568+
566569
MtmDoReplication = true;
567570
TM = &MtmTM;
568571
LWLockRelease(AddinShmemInitLock);
@@ -1149,7 +1152,7 @@ _PG_init(void)
11491152
* resources in mtm_shmem_startup().
11501153
*/
11511154
RequestAddinShmemSpace(MTM_SHMEM_SIZE + MtmMaxNodes*MtmQueueSize);
1152-
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2);
1155+
RequestNamedLWLockTranche(MULTIMASTER_NAME, 1 + MtmMaxNodes*2 + 1);
11531156

11541157
MtmMonitorInitialize();
11551158

src/pglogical_apply.c

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
8585
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
8686

8787
static bool process_remote_begin(StringInfo s, GlobalTransactionId *gtid);
88-
static bool process_remote_message(StringInfo s);
88+
static bool process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx);
8989
static void process_remote_commit(StringInfo s, GlobalTransactionId *current_gtid);
9090
static void process_remote_insert(StringInfo s, Relation rel);
9191
static void process_remote_update(StringInfo s, Relation rel);
@@ -495,7 +495,7 @@ process_remote_begin(StringInfo s, GlobalTransactionId *gtid)
495495
}
496496

497497
static bool
498-
process_remote_message(StringInfo s)
498+
process_remote_message(StringInfo s, MtmReceiverContext *receiver_ctx)
499499
{
500500
char action = pq_getmsgbyte(s);
501501
int messageSize = pq_getmsgint(s, 4);
@@ -528,6 +528,23 @@ process_remote_message(StringInfo s)
528528
standalone = true;
529529
break;
530530
}
531+
case 'P':
532+
{
533+
int node_id = -1;
534+
535+
sscanf(messageBody, "%d", &node_id);
536+
537+
Assert(node_id > 0);
538+
Assert(receiver_ctx != NULL);
539+
// XXX assert that it is receiver itself
540+
541+
if (!receiver_ctx->is_recovery && node_id == MtmNodeId)
542+
{
543+
Assert(!receiver_ctx->parallel_allowed);
544+
receiver_ctx->parallel_allowed = true;
545+
}
546+
break;
547+
}
531548
default:
532549
Assert(false);
533550
}
@@ -1229,7 +1246,8 @@ process_remote_delete(StringInfo s, Relation rel)
12291246
CommandCounterIncrement();
12301247
}
12311248

1232-
void MtmExecutor(void* work, size_t size)
1249+
void
1250+
MtmExecutor(void* work, size_t size, MtmReceiverContext *receiver_ctx)
12331251
{
12341252
StringInfoData s;
12351253
Relation rel = NULL;
@@ -1342,7 +1360,7 @@ void MtmExecutor(void* work, size_t size)
13421360
{
13431361
close_rel(rel);
13441362
rel = NULL;
1345-
inside_transaction = !process_remote_message(&s);
1363+
inside_transaction = !process_remote_message(&s, receiver_ctx);
13461364
break;
13471365
}
13481366
case 'Z':

src/pglogical_output.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
#include "replication/message.h"
3737
#include "replication/origin.h"
3838

39+
#include "storage/ipc.h"
40+
3941
#include "utils/builtins.h"
4042
#include "utils/catcache.h"
4143
#include "utils/guc.h"
@@ -471,6 +473,13 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
471473
MtmOutputPluginPrepareWrite(ctx, true, true);
472474
data->api->write_caughtup(ctx->out, data, ctx->reader->EndRecPtr);
473475
MtmOutputPluginWrite(ctx, true, true);
476+
477+
/*
478+
* This hook can be called mupltiple times when there is concurrent
479+
* load, so exit right after we wrote recovery message first time during
480+
* current recovery session.
481+
*/
482+
proc_exit(0);
474483
}
475484
}
476485

src/pglogical_proto.c

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
#include "utils/typcache.h"
3737
#include "utils/snapmgr.h"
3838

39+
#include "replication/message.h"
40+
3941
#include "pglogical_relid_map.h"
4042

4143
#include "multimaster.h"
@@ -272,7 +274,9 @@ void pglogical_write_caughtup(StringInfo out, PGLogicalOutputData *data,
272274
MtmDecoderPrivate *hooks_data = (MtmDecoderPrivate *) data->hooks.hooks_private_data;
273275

274276
Assert(hooks_data->is_recovery);
275-
pq_sendbyte(out, 'Z'); /* sending CAUGHT-UP */
277+
/* sending CAUGHT-UP */
278+
pq_sendbyte(out, 'Z');
279+
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_RECOVERY_CAUGHTUP, false);
276280
}
277281

278282
/*
@@ -736,15 +740,14 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
736740
}
737741
}
738742

739-
MtmLock(LW_EXCLUSIVE);
740-
741743
/*
742744
* Set proper originId mappings.
743745
*
744746
* This is copypasted from receiver. Better to have normal init method
745747
* to setup all stuff in shared memory. But seems that there is no such
746748
* callback in vanilla pg and adding one will require some carefull thoughts.
747749
*/
750+
MtmLock(LW_EXCLUSIVE);
748751
for (i = 0; i < Mtm->nAllNodes; i++)
749752
{
750753
char *originName;
@@ -777,6 +780,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
777780
mtm_log(ERROR, "Out-of-clique node %d tries to connect",
778781
MtmReplicationNodeId);
779782
}
783+
MtmUnlock();
780784

781785
if (hooks_data->is_recovery)
782786
{
@@ -785,17 +789,40 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
785789
MtmReplicationNodeId);
786790

787791
Assert(MyReplicationSlot != NULL);
788-
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERY, true);
792+
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERY, false);
789793
}
790794
else
791795
{
792796
mtm_log(ProtoTraceMode,
793797
"Walsender starts in recovered mode to node %d",
794798
MtmReplicationNodeId);
795799

796-
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERED, true);
800+
/*
801+
* Indicate receiver that after this point in wal it is safe to send
802+
* transaction to the pool of workers. Before this point in wal (or in other
803+
* words before we processed MTM_NEIGHBOR_WAL_SENDER_START_RECOVERY event
804+
* and enabled this node in our disabledNodeMask) our backends are not waiting
805+
* for prepare confirmations from this node, so receiver can get precommit
806+
* before it will finish prepare. This will leave receiver with lots of
807+
* prepared transactions that will never be commited as precommit and commit
808+
* already happend before prepare.
809+
*
810+
* To ensure that all transactions ended after this message had seen right
811+
* disabledNodeMask we took MtmCommitBarrier in exclusive mode to await
812+
* finish of all transactions with potentially old disabledNodeMask.
813+
*/
814+
if (!hooks_data->is_recovery)
815+
{
816+
char *dest_id = psprintf("%d", MtmReplicationNodeId);
817+
818+
LWLockAcquire(MtmCommitBarrier, LW_EXCLUSIVE);
819+
MtmStateProcessNeighborEvent(MtmReplicationNodeId, MTM_NEIGHBOR_WAL_SENDER_START_RECOVERED, false);
820+
LWLockRelease(MtmCommitBarrier);
821+
822+
XLogFlush(LogLogicalMessage("P", dest_id, strlen(dest_id), false));
823+
}
797824
}
798-
MtmUnlock();
825+
799826

800827
}
801828

0 commit comments

Comments
 (0)