Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 008afc4

Browse files
committed
[PGPRO-4074] Additional mm XACT_EVENTs and distributed deadlock hook.
This is finely selected cherry pick of dec7a31cadc9048. I've removed xtm because we don't need almost all fields. ATX save-resume hook is not ported but will be in the future once ATX lands itself. Parallel workers serialize-deserialize hook is not ported because currently I see no value for mm in it; long time ago it was used for importing csn snapshots. Unused XACT_EVENTs are not ported as well. tags: multimaster (cherry picked from commit ec1bf5004833e95a18da8e05443a4f6e308d1fb6)
1 parent 2688852 commit 008afc4

File tree

7 files changed

+49
-6
lines changed

7 files changed

+49
-6
lines changed

contrib/postgres_fdw/connection.c

+13
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,16 @@ pgfdw_xact_callback(XactEvent event, void *arg)
713713
HASH_SEQ_STATUS scan;
714714
ConnCacheEntry *entry;
715715

716+
/* Do nothing for these (multimaster support) events */
717+
switch (event)
718+
{
719+
case XACT_EVENT_START:
720+
case XACT_EVENT_COMMIT_COMMAND:
721+
return;
722+
default:
723+
break;
724+
}
725+
716726
/* Quick exit if no connections were touched in this transaction. */
717727
if (!xact_got_connection)
718728
return;
@@ -861,6 +871,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
861871
/* Disarm changing_xact_state if it all worked. */
862872
entry->changing_xact_state = abort_cleanup_failure;
863873
break;
874+
case XACT_EVENT_START:
875+
case XACT_EVENT_COMMIT_COMMAND:
876+
Assert(false);
864877
}
865878
}
866879

src/backend/access/transam/xact.c

+5-2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,6 @@ static void AtCommit_Memory(void);
310310
static void AtStart_Cache(void);
311311
static void AtStart_Memory(void);
312312
static void AtStart_ResourceOwner(void);
313-
static void CallXactCallbacks(XactEvent event);
314313
static void CallSubXactCallbacks(SubXactEvent event,
315314
SubTransactionId mySubid,
316315
SubTransactionId parentSubid);
@@ -2827,6 +2826,7 @@ StartTransactionCommand(void)
28272826
case TBLOCK_DEFAULT:
28282827
StartTransaction();
28292828
s->blockState = TBLOCK_STARTED;
2829+
CallXactCallbacks(XACT_EVENT_START);
28302830
break;
28312831

28322832
/*
@@ -2919,6 +2919,8 @@ CommitTransactionCommand(void)
29192919
{
29202920
TransactionState s = CurrentTransactionState;
29212921

2922+
CallXactCallbacks(XACT_EVENT_COMMIT_COMMAND);
2923+
29222924
if (s->chain)
29232925
SaveTransactionCharacteristics();
29242926

@@ -3536,7 +3538,7 @@ UnregisterXactCallback(XactCallback callback, void *arg)
35363538
}
35373539
}
35383540

3539-
static void
3541+
void
35403542
CallXactCallbacks(XactEvent event)
35413543
{
35423544
XactCallbackItem *item;
@@ -5283,6 +5285,7 @@ StartParallelWorkerTransaction(char *tstatespace)
52835285

52845286
Assert(CurrentTransactionState->blockState == TBLOCK_DEFAULT);
52855287
StartTransaction();
5288+
CallXactCallbacks(XACT_EVENT_START);
52865289

52875290
tstate = (SerializedTransactionState *) tstatespace;
52885291
XactIsoLevel = tstate->xactIsoLevel;

src/backend/storage/lmgr/deadlock.c

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "storage/proc.h"
3333
#include "utils/memutils.h"
3434

35+
bool (*DetectGlobalDeadLock) (PGPROC *proc) = NULL;
3536

3637
/*
3738
* One edge in the waits-for graph.
@@ -280,6 +281,8 @@ DeadLockCheck(PGPROC *proc)
280281
return DS_SOFT_DEADLOCK;
281282
else if (blocking_autovacuum_proc != NULL)
282283
return DS_BLOCKED_BY_AUTOVACUUM;
284+
else if (DetectGlobalDeadLock && DetectGlobalDeadLock(proc))
285+
return DS_DISTRIBUTED_DEADLOCK;
283286
else
284287
return DS_NO_DEADLOCK;
285288
}

src/backend/storage/lmgr/proc.c

+20-2
Original file line numberDiff line numberDiff line change
@@ -1486,6 +1486,22 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
14861486
"Processes holding the lock: %s. Wait queue: %s.",
14871487
lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
14881488
}
1489+
else if (deadlock_state == DS_DISTRIBUTED_DEADLOCK)
1490+
{
1491+
/*
1492+
* This message is a bit redundant with the error that will be
1493+
* reported subsequently, but in some cases the error report
1494+
* might not make it to the log (eg, if it's caught by an
1495+
* exception handler), and we want to ensure all long-wait
1496+
* events get logged.
1497+
*/
1498+
ereport(LOG,
1499+
(errmsg("process %d detected a distributed deadlock while waiting for %s on %s after %ld.%03d ms",
1500+
MyProcPid, modename, buf.data, msecs, usecs),
1501+
(errdetail_log_plural("Process holding the lock: %s. Wait queue: %s.",
1502+
"Processes holding the lock: %s. Wait queue: %s.",
1503+
lockHoldersNum, lock_holders_sbuf.data, lock_waiters_sbuf.data))));
1504+
}
14891505

14901506
if (myWaitStatus == STATUS_WAITING)
14911507
ereport(LOG,
@@ -1510,7 +1526,8 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
15101526
* future-proofing, print a message if it looks like someone
15111527
* else kicked us off the lock.
15121528
*/
1513-
if (deadlock_state != DS_HARD_DEADLOCK)
1529+
if (deadlock_state != DS_HARD_DEADLOCK &&
1530+
deadlock_state != DS_DISTRIBUTED_DEADLOCK)
15141531
ereport(LOG,
15151532
(errmsg("process %d failed to acquire %s on %s after %ld.%03d ms",
15161533
MyProcPid, modename, buf.data, msecs, usecs),
@@ -1727,7 +1744,8 @@ CheckDeadLock(void)
17271744
/* Run the deadlock check, and set deadlock_state for use by ProcSleep */
17281745
deadlock_state = DeadLockCheck(MyProc);
17291746

1730-
if (deadlock_state == DS_HARD_DEADLOCK)
1747+
if (deadlock_state == DS_HARD_DEADLOCK ||
1748+
deadlock_state == DS_DISTRIBUTED_DEADLOCK)
17311749
{
17321750
/*
17331751
* Oops. We have a deadlock.

src/include/access/xact.h

+4-1
Original file line numberDiff line numberDiff line change
@@ -108,14 +108,16 @@ extern int MyXactFlags;
108108
*/
109109
typedef enum
110110
{
111+
XACT_EVENT_START,
111112
XACT_EVENT_COMMIT,
112113
XACT_EVENT_PARALLEL_COMMIT,
113114
XACT_EVENT_ABORT,
114115
XACT_EVENT_PARALLEL_ABORT,
115116
XACT_EVENT_PREPARE,
116117
XACT_EVENT_PRE_COMMIT,
117118
XACT_EVENT_PARALLEL_PRE_COMMIT,
118-
XACT_EVENT_PRE_PREPARE
119+
XACT_EVENT_PRE_PREPARE,
120+
XACT_EVENT_COMMIT_COMMAND
119121
} XactEvent;
120122

121123
typedef void (*XactCallback) (XactEvent event, void *arg);
@@ -459,5 +461,6 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse
459461
extern void EnterParallelMode(void);
460462
extern void ExitParallelMode(void);
461463
extern bool IsInParallelMode(void);
464+
extern void CallXactCallbacks(XactEvent event);
462465

463466
#endif /* XACT_H */

src/include/storage/lock.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -500,8 +500,9 @@ typedef enum
500500
DS_NO_DEADLOCK, /* no deadlock detected */
501501
DS_SOFT_DEADLOCK, /* deadlock avoided by queue rearrangement */
502502
DS_HARD_DEADLOCK, /* deadlock, no way out but ERROR */
503-
DS_BLOCKED_BY_AUTOVACUUM /* no deadlock; queue blocked by autovacuum
503+
DS_BLOCKED_BY_AUTOVACUUM, /* no deadlock; queue blocked by autovacuum
504504
* worker */
505+
DS_DISTRIBUTED_DEADLOCK /* distributed deadlock detected by DTM */
505506
} DeadLockState;
506507

507508
/*

src/include/storage/proc.h

+2
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ extern PGDLLIMPORT int LockTimeout;
296296
extern PGDLLIMPORT int IdleInTransactionSessionTimeout;
297297
extern bool log_lock_waits;
298298

299+
/* hook for extensions to catch distributed deadlock */
300+
extern PGDLLIMPORT bool (*DetectGlobalDeadLock) (PGPROC *proc);
299301

300302
/*
301303
* Function Prototypes

0 commit comments

Comments
 (0)