Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 1ce9da8

Browse files
committed
dmq integration: resolver fixes, extend TxFinish logging with keyword to grep for
1 parent c5a1712 commit 1ce9da8

File tree

4 files changed

+30
-20
lines changed

4 files changed

+30
-20
lines changed

commit.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
106106
char stream[DMQ_NAME_MAXLEN];
107107
pgid_t gid;
108108

109+
/* XXX: avoid Mtm->extension_created */
109110
if (!x->isDistributed || !x->containsDML || !Mtm->extension_created)
110111
return false;
111112

@@ -134,6 +135,7 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
134135
xid = GetCurrentTransactionId();
135136
sprintf(stream, "xid" XID_FMT, xid);
136137
dmq_stream_subscribe(stream);
138+
mtm_log(MtmTxTrace, "%s subscribed for %s", gid, stream);
137139
x->xid = xid;
138140

139141
MtmGenerateGid(gid, xid);
@@ -144,7 +146,7 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
144146
ret = PrepareTransactionBlock(gid);
145147
if (!ret)
146148
{
147-
mtm_log(MtmTxFinish, "%s prepared", gid);
149+
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
148150
mtm_log(WARNING, "Failed to prepare transaction %s", gid);
149151
return false;
150152
}
@@ -155,20 +157,21 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
155157
{
156158
dmq_stream_unsubscribe(stream);
157159
FinishPreparedTransaction(gid, false, false);
158-
mtm_log(MtmTxFinish, "%s aborted", gid);
160+
mtm_log(MtmTxFinish, "TXFINISH: %s aborted", gid);
159161
mtm_log(ERROR, "Failed to prepare transaction %s at node %d",
160162
gid, failed_at);
161163
}
162164

163165
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
164-
mtm_log(MtmTxFinish, "%s precommittted", gid);
166+
mtm_log(MtmTxFinish, "TXFINISH: %s precommittted", gid);
165167
GatherPrecommits(x, participantsMask);
166168

167169
StartTransactionCommand();
168170
FinishPreparedTransaction(gid, true, false);
169-
mtm_log(MtmTxFinish, "%s committted", gid);
171+
mtm_log(MtmTxFinish, "TXFINISH: %s committted", gid);
170172

171173
dmq_stream_unsubscribe(stream);
174+
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);
172175

173176
return true;
174177
}

multimaster--1.0.sql

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION multimaster" to load this file. \quit
33

4-
-- -- check that multimaster shared library is really loaded
5-
-- DO $$
6-
-- BEGIN
7-
-- IF strpos(current_setting('shared_preload_libraries'), 'multimaster') = 0 THEN
8-
-- RAISE EXCEPTION 'Multimaster must be loaded via shared_preload_libraries. Refusing to proceed.';
9-
-- END IF;
10-
-- END
11-
-- $$;
4+
-- check that multimaster shared library is really loaded
5+
DO $$
6+
BEGIN
7+
IF strpos(current_setting('shared_preload_libraries'), 'multimaster') = 0 THEN
8+
RAISE EXCEPTION 'Multimaster must be loaded via shared_preload_libraries. Refusing to proceed.';
9+
END IF;
10+
END
11+
$$;
1212

1313

1414
CREATE FUNCTION mtm.dmq_receiver_loop(sender_name text) RETURNS void

pglogical_apply.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
#include "pglogical_relid_map.h"
6060
#include "spill.h"
6161
#include "state.h"
62+
#include "logger.h"
6263

6364
typedef struct TupleData
6465
{
@@ -835,6 +836,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
835836
} else {
836837
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
837838
}
839+
mtm_log(MtmTxFinish, "TXFINISH: %s precommittted", gid);
838840

839841
// MtmPrecommitTransaction(gid);
840842

@@ -876,6 +878,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
876878

877879
/* PREPARE itself */
878880
res = PrepareTransactionBlock(gid);
881+
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
879882
CommitTransactionCommand();
880883

881884
MtmDeadlockDetectorRemoveXact(xid);
@@ -944,6 +947,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
944947
MtmSetCurrentTransactionGID(gid, origin_node);
945948
TXFINISH("%s COMMIT, PGLOGICAL_COMMIT_PREPARED csn=%lld", gid, csn);
946949
FinishPreparedTransaction(gid, true, false);
950+
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
947951
MTM_LOG2("Distributed transaction %s is committed", gid);
948952
CommitTransactionCommand();
949953
Assert(!MtmTransIsActive());
@@ -959,6 +963,7 @@ process_remote_commit(StringInfo in, GlobalTransactionId *current_gtid)
959963
StartTransactionCommand();
960964
FinishPreparedTransaction(gid, false, true);
961965
CommitTransactionCommand();
966+
mtm_log(MtmTxFinish, "TXFINISH: %s aborted", gid);
962967
break;
963968
}
964969
default:

resolver.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -255,19 +255,21 @@ resolve_tx(const char *gid, int node_id, MtmTxState state)
255255

256256
tx->state[node_id-1] = state;
257257

258+
/* XXX: missing ok because we call this concurrently with logrep recovery */
259+
258260
if (exists(tx, MtmTxAborted | MtmTxNotFound))
259261
{
260-
FinishPreparedTransaction(gid, false, false);
261-
mtm_log(ResolverTxFinish, "%s aborted", gid);
262+
FinishPreparedTransaction(gid, false, true);
263+
mtm_log(ResolverTxFinish, "TXFINISH: %s aborted", gid);
262264
hash_search(gid2tx, gid, HASH_REMOVE, &found);
263265
Assert(found);
264266
return;
265267
}
266268

267269
if (exists(tx, MtmTxCommited))
268270
{
269-
FinishPreparedTransaction(gid, true, false);
270-
mtm_log(ResolverTxFinish, "%s committed", gid);
271+
FinishPreparedTransaction(gid, true, true);
272+
mtm_log(ResolverTxFinish, "TXFINISH: %s committed", gid);
271273
hash_search(gid2tx, gid, HASH_REMOVE, &found);
272274
Assert(found);
273275
return;
@@ -279,8 +281,8 @@ resolve_tx(const char *gid, int node_id, MtmTxState state)
279281
// XXX: do that through PreCommit
280282
// SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
281283
// tx->state[MtmNodeId-1] = MtmTxPreCommited;
282-
FinishPreparedTransaction(gid, true, false);
283-
mtm_log(ResolverTxFinish, "%s committed", gid);
284+
FinishPreparedTransaction(gid, true, true);
285+
mtm_log(ResolverTxFinish, "TXFINISH: %s committed", gid);
284286
hash_search(gid2tx, gid, HASH_REMOVE, &found);
285287
Assert(found);
286288
return;
@@ -291,8 +293,8 @@ resolve_tx(const char *gid, int node_id, MtmTxState state)
291293
// XXX: do that through PreAbort
292294
// SetPreparedTransactionState(gid, MULTIMASTER_PREABORTED);
293295
// tx->state[MtmNodeId-1] = MtmTxPreAborted;
294-
FinishPreparedTransaction(gid, false, false);
295-
mtm_log(ResolverTxFinish, "%s aborted", gid);
296+
FinishPreparedTransaction(gid, false, true);
297+
mtm_log(ResolverTxFinish, "TXFINISH: %s aborted", gid);
296298
hash_search(gid2tx, gid, HASH_REMOVE, &found);
297299
Assert(found);
298300
return;

0 commit comments

Comments
 (0)