79
79
#include "access/xact.h"
80
80
81
81
#include "catalog/indexing.h"
82
-
83
82
#include "nodes/execnodes.h"
84
83
85
84
#include "replication/origin.h"
86
85
#include "replication/logical.h"
87
-
86
+ #include "pgstat.h"
88
87
#include "storage/fd.h"
89
88
#include "storage/ipc.h"
90
89
#include "storage/lmgr.h"
90
+ #include "storage/condition_variable.h"
91
91
#include "storage/copydir.h"
92
92
93
93
#include "utils/builtins.h"
@@ -124,6 +124,11 @@ typedef struct ReplicationState
124
124
*/
125
125
int acquired_by ;
126
126
127
+ /*
128
+ * Condition variable that's signalled when acquired_by changes.
129
+ */
130
+ ConditionVariable origin_cv ;
131
+
127
132
/*
128
133
* Lock protecting remote_lsn and local_lsn.
129
134
*/
@@ -324,16 +329,18 @@ replorigin_create(char *roname)
324
329
* Needs to be called in a transaction.
325
330
*/
326
331
void
327
- replorigin_drop (RepOriginId roident )
332
+ replorigin_drop (RepOriginId roident , bool nowait )
328
333
{
329
- HeapTuple tuple = NULL ;
334
+ HeapTuple tuple ;
330
335
Relation rel ;
331
336
int i ;
332
337
333
338
Assert (IsTransactionState ());
334
339
335
340
rel = heap_open (ReplicationOriginRelationId , ExclusiveLock );
336
341
342
+ restart :
343
+ tuple = NULL ;
337
344
/* cleanup the slot state info */
338
345
LWLockAcquire (ReplicationOriginLock , LW_EXCLUSIVE );
339
346
@@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident)
346
353
{
347
354
if (state -> acquired_by != 0 )
348
355
{
349
- ereport (ERROR ,
350
- (errcode (ERRCODE_OBJECT_IN_USE ),
351
- errmsg ("could not drop replication origin with OID %d, in use by PID %d" ,
352
- state -> roident ,
353
- state -> acquired_by )));
356
+ ConditionVariable * cv ;
357
+
358
+ if (nowait )
359
+ ereport (ERROR ,
360
+ (errcode (ERRCODE_OBJECT_IN_USE ),
361
+ errmsg ("could not drop replication origin with OID %d, in use by PID %d" ,
362
+ state -> roident ,
363
+ state -> acquired_by )));
364
+ cv = & state -> origin_cv ;
365
+
366
+ LWLockRelease (ReplicationOriginLock );
367
+ ConditionVariablePrepareToSleep (cv );
368
+ ConditionVariableSleep (cv , WAIT_EVENT_REPLICATION_ORIGIN_DROP );
369
+ ConditionVariableCancelSleep ();
370
+ goto restart ;
354
371
}
355
372
356
373
/* first WAL log */
@@ -382,7 +399,7 @@ replorigin_drop(RepOriginId roident)
382
399
383
400
CommandCounterIncrement ();
384
401
385
- /* now release lock again, */
402
+ /* now release lock again */
386
403
heap_close (rel , ExclusiveLock );
387
404
}
388
405
@@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void)
476
493
MemSet (replication_states , 0 , ReplicationOriginShmemSize ());
477
494
478
495
for (i = 0 ; i < max_replication_slots ; i ++ )
496
+ {
479
497
LWLockInitialize (& replication_states [i ].lock ,
480
498
replication_states_ctl -> tranche_id );
499
+ ConditionVariableInit (& replication_states [i ].origin_cv );
500
+ }
481
501
}
482
502
483
503
LWLockRegisterTranche (replication_states_ctl -> tranche_id ,
@@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush)
957
977
static void
958
978
ReplicationOriginExitCleanup (int code , Datum arg )
959
979
{
980
+ ConditionVariable * cv = NULL ;
981
+
960
982
LWLockAcquire (ReplicationOriginLock , LW_EXCLUSIVE );
961
983
962
984
if (session_replication_state != NULL &&
963
985
session_replication_state -> acquired_by == MyProcPid )
964
986
{
987
+ cv = & session_replication_state -> origin_cv ;
988
+
965
989
session_replication_state -> acquired_by = 0 ;
966
990
session_replication_state = NULL ;
967
991
}
968
992
969
993
LWLockRelease (ReplicationOriginLock );
994
+
995
+ if (cv )
996
+ ConditionVariableBroadcast (cv );
970
997
}
971
998
972
999
/*
@@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node)
1056
1083
session_replication_state -> acquired_by = MyProcPid ;
1057
1084
1058
1085
LWLockRelease (ReplicationOriginLock );
1086
+
1087
+ /* probably this one is pointless */
1088
+ ConditionVariableBroadcast (& session_replication_state -> origin_cv );
1059
1089
}
1060
1090
1061
1091
/*
@@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node)
1067
1097
void
1068
1098
replorigin_session_reset (void )
1069
1099
{
1100
+ ConditionVariable * cv ;
1101
+
1070
1102
Assert (max_replication_slots != 0 );
1071
1103
1072
1104
if (session_replication_state == NULL )
@@ -1077,9 +1109,12 @@ replorigin_session_reset(void)
1077
1109
LWLockAcquire (ReplicationOriginLock , LW_EXCLUSIVE );
1078
1110
1079
1111
session_replication_state -> acquired_by = 0 ;
1112
+ cv = & session_replication_state -> origin_cv ;
1080
1113
session_replication_state = NULL ;
1081
1114
1082
1115
LWLockRelease (ReplicationOriginLock );
1116
+
1117
+ ConditionVariableBroadcast (cv );
1083
1118
}
1084
1119
1085
1120
/*
@@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS)
1170
1205
roident = replorigin_by_name (name , false);
1171
1206
Assert (OidIsValid (roident ));
1172
1207
1173
- replorigin_drop (roident );
1208
+ replorigin_drop (roident , false );
1174
1209
1175
1210
pfree (name );
1176
1211
0 commit comments