Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 629cdfd

Browse files
committed
dmq integration: normal commit procedure works
1 parent a5a05cb commit 629cdfd

11 files changed

+1727
-93
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11

22
EXTENSION = multimaster
33
DATA = multimaster--1.0.sql
4-
OBJS = multimaster.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o state.o
4+
OBJS = multimaster.o dmq.o commit.o bytebuf.o bgwpool.o pglogical_output.o \
5+
pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o \
6+
pglogical_config.o pglogical_relid_map.o ddd.o bkb.o spill.o state.o
57
MODULE_big = multimaster
68

79
PG_CPPFLAGS = -I$(libpq_srcdir)

commit.c

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
#include "postgres.h"
2+
#include "multimaster.h"
3+
#include "storage/proc.h"
4+
#include "utils/guc.h"
5+
#include "miscadmin.h"
6+
#include "commands/dbcommands.h"
7+
8+
static Oid MtmDatabaseId;
9+
static bool DmqSubscribed;
10+
static int sender_to_node[64];
11+
12+
MtmCurrentTrans MtmTx;
13+
14+
static void MtmBeginTransaction(MtmCurrentTrans* x);
15+
static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
16+
17+
static bool GatherPrepares(MtmCurrentTrans* x, nodemask_t participantsMask,
18+
int *failed_at);
19+
static void GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask);
20+
21+
22+
23+
24+
25+
void
26+
MtmXactCallback2(XactEvent event, void *arg)
27+
{
28+
if (!MtmIsLogicalReceiver)
29+
{
30+
switch (event)
31+
{
32+
case XACT_EVENT_START:
33+
MtmBeginTransaction(&MtmTx);
34+
break;
35+
case XACT_EVENT_PRE_PREPARE:
36+
MtmPrePrepareTransaction(&MtmTx);
37+
break;
38+
case XACT_EVENT_COMMIT_COMMAND:
39+
if (!MtmTx.isTransactionBlock && !IsSubTransaction())
40+
MtmTwoPhaseCommit(&MtmTx);
41+
break;
42+
default:
43+
break;
44+
}
45+
}
46+
}
47+
48+
static void
49+
MtmBeginTransaction(MtmCurrentTrans* x)
50+
{
51+
// XXX: move it down the callbacks?
52+
x->isDistributed = MtmIsUserTransaction();
53+
x->containsDML = false; // will be set by executor hook
54+
x->isTransactionBlock = IsTransactionBlock();
55+
56+
// /* Application name can be changed using PGAPPNAME environment variable */
57+
// if (x->isDistributed && Mtm->status != MTM_ONLINE
58+
// && strcmp(application_name, MULTIMASTER_ADMIN) != 0
59+
// && strcmp(application_name, MULTIMASTER_BROADCAST_SERVICE) != 0)
60+
// {
61+
// /* Reject all user's transactions at offline cluster.
62+
// * Allow execution of transaction by bg-workers to makeit possible to perform recovery.
63+
// */
64+
// MTM_ELOG(ERROR,
65+
// "Multimaster node is not online: current status %s",
66+
// MtmNodeStatusMnem[Mtm->status]);
67+
// }
68+
}
69+
70+
static void
71+
MtmPrePrepareTransaction(MtmCurrentTrans* x)
72+
{
73+
if (!x->isDistributed)
74+
return;
75+
76+
if (!MtmDatabaseId)
77+
MtmDatabaseId = get_database_oid(MtmDatabaseName, false);
78+
79+
if (MtmDatabaseId != MyDatabaseId)
80+
MTM_ELOG(ERROR,
81+
"Refusing to work. Multimaster configured to work with database '%s'",
82+
MtmDatabaseName);
83+
84+
Assert(TransactionIdIsValid(GetCurrentTransactionId()));
85+
Assert(*x->gid != '\0');
86+
}
87+
88+
bool // XXX: do we need that bool?
89+
MtmTwoPhaseCommit(MtmCurrentTrans* x)
90+
{
91+
nodemask_t participantsMask;
92+
bool ret;
93+
int failed_at;
94+
95+
if (!x->isDistributed || !x->containsDML)
96+
return false;
97+
98+
if (!DmqSubscribed)
99+
{
100+
int i, j;
101+
102+
for (j = 0, i = 0; i < Mtm->nAllNodes; i++)
103+
{
104+
if (i + 1 == MtmNodeId)
105+
continue;
106+
107+
dmq_stream_subscribe(psprintf("node%d", i + 1),
108+
psprintf("be%d", MyProc->pgprocno));
109+
110+
sender_to_node[j++] = i + 1;
111+
}
112+
DmqSubscribed = true;
113+
}
114+
115+
if (!x->isTransactionBlock)
116+
{
117+
BeginTransactionBlock();
118+
x->isTransactionBlock = true;
119+
CommitTransactionCommand();
120+
StartTransactionCommand();
121+
}
122+
123+
MtmGenerateGid(x->gid);
124+
participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) &
125+
~Mtm->disabledNodeMask &
126+
~((nodemask_t)1 << (MtmNodeId-1));
127+
128+
ret = PrepareTransactionBlock(x->gid);
129+
if (!ret)
130+
{
131+
MTM_ELOG(WARNING, "Failed to prepare transaction %s (%llu)", x->gid, (long64)x->xid);
132+
return false;
133+
}
134+
CommitTransactionCommand();
135+
136+
ret = GatherPrepares(x, participantsMask, &failed_at);
137+
if (!ret)
138+
{
139+
FinishPreparedTransaction(x->gid, false, false);
140+
MTM_ELOG(ERROR, "Failed to prepare transaction %s at node %d",
141+
x->gid, failed_at);
142+
}
143+
144+
SetPreparedTransactionState(x->gid, MULTIMASTER_PRECOMMITTED);
145+
GatherPrecommits(x, participantsMask);
146+
147+
StartTransactionCommand();
148+
FinishPreparedTransaction(x->gid, true, false);
149+
150+
return true;
151+
}
152+
153+
static bool
154+
GatherPrepares(MtmCurrentTrans* x, nodemask_t participantsMask, int *failed_at)
155+
{
156+
bool prepared = true;
157+
158+
Assert(participantsMask != 0);
159+
160+
while (participantsMask != 0)
161+
{
162+
DmqSenderId sender_id;
163+
StringInfoData buffer;
164+
MtmArbiterMessage *msg;
165+
166+
dmq_pop(&sender_id, &buffer);
167+
msg = (MtmArbiterMessage *) buffer.data;
168+
169+
// elog(LOG, "GatherPrepares: got %s from node%d", msg->gid, sender_to_node[sender_id]);
170+
ereport(LOG,
171+
(errmsg("GatherPrepares: got %s from node%d",
172+
msg->gid, sender_to_node[sender_id]),
173+
errhidestmt(true)));
174+
175+
Assert(msg->node == sender_to_node[sender_id]);
176+
Assert(msg->code == MSG_PREPARED || msg->code == MSG_ABORTED);
177+
Assert(strcmp(msg->gid, x->gid) == 0);
178+
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
179+
180+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
181+
182+
if (msg->code == MSG_ABORTED)
183+
{
184+
prepared = false;
185+
*failed_at = msg->node;
186+
}
187+
}
188+
189+
return prepared;
190+
}
191+
192+
static void
193+
GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask)
194+
{
195+
Assert(participantsMask != 0);
196+
197+
while (participantsMask != 0)
198+
{
199+
DmqSenderId sender_id;
200+
StringInfoData buffer;
201+
MtmArbiterMessage *msg;
202+
203+
dmq_pop(&sender_id, &buffer);
204+
msg = (MtmArbiterMessage *) buffer.data;
205+
206+
elog(LOG, "GatherPrecommits: got %s from node%d", msg->gid, sender_to_node[sender_id]);
207+
208+
Assert(msg->node == sender_to_node[sender_id]);
209+
Assert(msg->code == MSG_PRECOMMITTED);
210+
Assert(strcmp(msg->gid, x->gid) == 0);
211+
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
212+
213+
BIT_CLEAR(participantsMask, sender_to_node[sender_id] - 1);
214+
}
215+
}

0 commit comments

Comments
 (0)