Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fe80270

Browse files
committed
Fix several issues with resolver.
Resolver can catch an error when trying to commit transaction that is already during commit by receiver (see gxact->locking_backend). We can catch those, but better just let it happend, restart bgworker and continue resolving. In order to be able to continue after restart delete old task queue and move unresolved txes into shared hash. Also stubbornly scatter requests for stalled transaction after some timeout.
1 parent f695a4e commit fe80270

File tree

2 files changed

+79
-98
lines changed

2 files changed

+79
-98
lines changed

src/dmq.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ dmq_shmem_startup_hook(void)
215215
}
216216
}
217217

218-
// XXX: move it also under !found ?
219218
dmq_state->subscriptions = ShmemInitHash("dmq_stream_subscriptions",
220219
DMQ_MAX_SUBS_PER_BACKEND*MaxBackends,
221220
DMQ_MAX_SUBS_PER_BACKEND*MaxBackends,

src/resolver.c

Lines changed: 79 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,9 @@
2929
* to create tasks for resolver from other backends.
3030
*/
3131

32-
typedef enum
33-
{
34-
NoTask,
35-
ResolveForNode,
36-
ResolveAll
37-
} task_type;
38-
39-
typedef struct
40-
{
41-
task_type type;
42-
int arg;
43-
} resolver_task;
44-
4532
typedef struct
4633
{
4734
LWLock *lock;
48-
resolver_task task_queue[MTM_MAX_NODES];
49-
int tasks;
5035
pid_t pid;
5136
} resolver_state_data;
5237

@@ -89,6 +74,7 @@ resolver_shmem_size(void)
8974
static void
9075
resolver_shmem_startup_hook()
9176
{
77+
HASHCTL hash_info;
9278
bool found;
9379

9480
if (PreviousShmemStartupHook)
@@ -101,11 +87,16 @@ resolver_shmem_startup_hook()
10187

10288
if (!found)
10389
{
104-
resolver_state->lock = &(GetNamedLWLockTranche("resolver"))->lock;
105-
resolver_state->tasks = 0;
90+
resolver_state->lock = &(GetNamedLWLockTranche("resolver")->lock);
10691
resolver_state->pid = 0;
10792
}
10893

94+
/* init map with current unresolved transactions */
95+
hash_info.keysize = GIDSIZE;
96+
hash_info.entrysize = sizeof(resolver_tx);
97+
gid2tx = ShmemInitHash("gid2tx", MaxBackends, 2*MaxBackends, &hash_info,
98+
HASH_ELEM);
99+
109100
LWLockRelease(AddinShmemInitLock);
110101
}
111102

@@ -126,7 +117,7 @@ ResolverInit(void)
126117
memset(&worker, 0, sizeof(worker));
127118
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
128119
worker.bgw_start_time = BgWorkerStart_ConsistentState;
129-
worker.bgw_restart_time = 5;
120+
worker.bgw_restart_time = 1;
130121
worker.bgw_notify_pid = 0;
131122
worker.bgw_main_arg = 0;
132123
sprintf(worker.bgw_library_name, "multimaster");
@@ -146,31 +137,73 @@ ResolverInit(void)
146137
*
147138
*****************************************************************************/
148139

149-
void
150-
ResolveTransactionsForNode(int node_id)
140+
141+
142+
static bool
143+
load_tasks(int node_id)
151144
{
152-
resolver_task task = {ResolveForNode, node_id};
145+
PreparedTransaction pxacts;
146+
int n_xacts,
147+
added_xacts = 0,
148+
i;
153149

154-
LWLockAcquire(resolver_state->lock, LW_EXCLUSIVE);
150+
Assert(LWLockHeldByMeInMode(resolver_state->lock, LW_EXCLUSIVE));
151+
Assert(node_id == -1 || node_id > 0);
155152

156-
Assert(resolver_state->tasks < MTM_MAX_NODES);
157-
resolver_state->task_queue[resolver_state->tasks++] = task;
153+
n_xacts = GetPreparedTransactions(&pxacts);
158154

159-
LWLockRelease(resolver_state->lock);
155+
for (i = 0; i < n_xacts; i++)
156+
{
157+
char const *gid = pxacts[i].gid;
158+
int xact_node_id;
159+
160+
xact_node_id = MtmGidParseNodeId(gid);
161+
162+
if (xact_node_id > 0 &&
163+
(node_id == -1 || node_id == xact_node_id))
164+
{
165+
int j;
166+
resolver_tx *tx;
167+
168+
tx = (resolver_tx *) hash_search(gid2tx, gid, HASH_ENTER, NULL);
169+
added_xacts++;
170+
171+
for (j = 0; j < Mtm->nAllNodes; j++)
172+
tx->state[j] = MtmTxUnknown;
173+
174+
if (strcmp(pxacts[i].state_3pc, MULTIMASTER_PRECOMMITTED) == 0)
175+
tx->state[MtmNodeId - 1] = MtmTxPreCommited;
176+
else
177+
tx->state[MtmNodeId - 1] = MtmTxPrepared;
178+
}
179+
}
180+
181+
mtm_log(ResolverTasks, "[RESOLVER] got %d transactions to resolve",
182+
added_xacts);
183+
184+
return true;
160185
}
161186

162187

163188
void
164-
ResolveAllTransactions(void)
189+
ResolveTransactionsForNode(int node_id)
165190
{
166-
resolver_task task = {ResolveAll, 0};
191+
pid_t resolver_pid;
167192

168193
LWLockAcquire(resolver_state->lock, LW_EXCLUSIVE);
194+
load_tasks(node_id);
195+
resolver_pid = resolver_state->pid;
196+
LWLockRelease(resolver_state->lock);
197+
198+
if (resolver_pid)
199+
kill(resolver_pid, SIGHUP);
200+
}
169201

170-
Assert(resolver_state->tasks < MTM_MAX_NODES);
171-
resolver_state->task_queue[resolver_state->tasks++] = task;
172202

173-
LWLockRelease(resolver_state->lock);
203+
void
204+
ResolveAllTransactions(void)
205+
{
206+
ResolveTransactionsForNode(-1);
174207
}
175208

176209
char *
@@ -242,6 +275,7 @@ resolve_tx(const char *gid, int node_id, MtmTxState state)
242275
bool found;
243276
resolver_tx *tx;
244277

278+
Assert(LWLockHeldByMeInMode(resolver_state->lock, LW_EXCLUSIVE));
245279
Assert(state != MtmTxInProgress);
246280

247281
tx = hash_search(gid2tx, gid, HASH_FIND, &found);
@@ -316,66 +350,14 @@ resolve_tx(const char *gid, int node_id, MtmTxState state)
316350
*****************************************************************************/
317351

318352

319-
static bool
320-
load_tasks_if_any(void)
321-
{
322-
PreparedTransaction pxacts;
323-
int n_xacts, added_xacts = 0, i;
324-
resolver_task task = {NoTask, 0};
325-
326-
LWLockAcquire(resolver_state->lock, LW_EXCLUSIVE);
327-
if (resolver_state->tasks > 0)
328-
{
329-
task = resolver_state->task_queue[0];
330-
resolver_state->task_queue[0] =
331-
resolver_state->task_queue[--resolver_state->tasks];
332-
}
333-
LWLockRelease(resolver_state->lock);
334-
335-
if (task.type == NoTask)
336-
return false;
337-
338-
n_xacts = GetPreparedTransactions(&pxacts);
339-
340-
for (i = 0; i < n_xacts; i++)
341-
{
342-
char const *gid = pxacts[i].gid;
343-
int xact_node_id;
344-
345-
xact_node_id = MtmGidParseNodeId(gid);
346-
347-
if (xact_node_id > 0 &&
348-
(task.type == ResolveAll ||
349-
(task.type == ResolveForNode && task.arg == xact_node_id)))
350-
{
351-
int j;
352-
resolver_tx *tx;
353-
354-
tx = (resolver_tx *) hash_search(gid2tx, gid, HASH_ENTER, NULL);
355-
added_xacts++;
356-
357-
for (j = 0; j < Mtm->nAllNodes; j++)
358-
tx->state[j] = MtmTxUnknown;
359-
360-
if (strcmp(pxacts[i].state_3pc, MULTIMASTER_PRECOMMITTED) == 0)
361-
tx->state[MtmNodeId - 1] = MtmTxPreCommited;
362-
else
363-
tx->state[MtmNodeId - 1] = MtmTxPrepared;
364-
}
365-
}
366-
367-
mtm_log(ResolverTasks, "[RESOLVER] got %d transactions to resolve",
368-
added_xacts);
369-
370-
return true;
371-
}
372-
373353
static void
374354
scatter_status_requests(void)
375355
{
376356
HASH_SEQ_STATUS hash_seq;
377357
resolver_tx *tx;
378358

359+
LWLockAcquire(resolver_state->lock, LW_SHARED);
360+
379361
hash_seq_init(&hash_seq, gid2tx);
380362
while ((tx = hash_seq_search(&hash_seq)) != NULL)
381363
{
@@ -404,6 +386,8 @@ scatter_status_requests(void)
404386
}
405387
}
406388

389+
LWLockRelease(resolver_state->lock);
390+
407391
}
408392

409393
static void
@@ -423,18 +407,20 @@ handle_responses(void)
423407
Assert(msg->node == node_id);
424408
Assert(msg->code == MSG_POLL_STATUS);
425409

410+
LWLockAcquire(resolver_state->lock, LW_EXCLUSIVE);
426411
resolve_tx(msg->gid, node_id, msg->state);
412+
LWLockRelease(resolver_state->lock);
427413
}
428414
}
429415

430416
void
431417
ResolverMain(void)
432418
{
433-
bool new_tasks;
434-
HASHCTL ctl;
435419
int i, sender_id = 0;
420+
bool send_requests = true;
436421

437422
/* init this worker */
423+
pqsignal(SIGHUP, PostgresSigHupHandler);
438424
pqsignal(SIGTERM, die);
439425
BackgroundWorkerUnblockSignals();
440426

@@ -444,11 +430,6 @@ ResolverMain(void)
444430

445431
MtmWaitForExtensionCreation();
446432

447-
/* init map with current unresolved transactions */
448-
ctl.keysize = GIDSIZE;
449-
ctl.entrysize = sizeof(resolver_tx);
450-
gid2tx = hash_create("gid2tx", MaxBackends, &ctl, HASH_ELEM);
451-
452433
/* subscribe to status-responses channels from other nodes */
453434
for (i = 0; i < Mtm->nAllNodes; i++)
454435
{
@@ -471,14 +452,11 @@ ResolverMain(void)
471452

472453
CHECK_FOR_INTERRUPTS();
473454

474-
/* Check if we got any new transactions to work on */
475-
new_tasks = load_tasks_if_any();
476-
477455
/* Scatter requests for unresolved transactions */
478-
if (new_tasks) /* XXX: retry after some timeout */
456+
if (send_requests)
479457
{
480-
new_tasks = false;
481458
scatter_status_requests();
459+
send_requests = false;
482460
}
483461

484462
/* Gather responses */
@@ -487,9 +465,13 @@ ResolverMain(void)
487465
/* Sleep untl somebody wakes us */
488466
rc = WaitLatch(MyLatch,
489467
WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
490-
250,
468+
3000,
491469
PG_WAIT_TIMEOUT);
492470

471+
/* re-try to send requests if there are some unresolved transactions */
472+
if (rc & WL_TIMEOUT)
473+
send_requests = true;
474+
493475
/* Emergency bailout if postmaster has died */
494476
if (rc & WL_POSTMASTER_DEATH)
495477
proc_exit(1);

0 commit comments

Comments
 (0)