@@ -202,12 +202,19 @@ typedef struct QueuePosition
202
202
(x).page != (y).page ? (y) : \
203
203
(x).offset < (y).offset ? (x) : (y))
204
204
205
+ /* choose logically larger QueuePosition */
206
+ #define QUEUE_POS_MAX (x ,y ) \
207
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
208
+ (x).page != (y).page ? (x) : \
209
+ (x).offset > (y).offset ? (x) : (y))
210
+
205
211
/*
206
212
* Struct describing a listening backend's status
207
213
*/
208
214
typedef struct QueueBackendStatus
209
215
{
210
216
int32 pid ; /* either a PID or InvalidPid */
217
+ Oid dboid ; /* backend's database OID, or InvalidOid */
211
218
QueuePosition pos ; /* backend has read queue up to here */
212
219
} QueueBackendStatus ;
213
220
@@ -235,8 +242,8 @@ typedef struct QueueBackendStatus
235
242
typedef struct AsyncQueueControl
236
243
{
237
244
QueuePosition head ; /* head points to the next free location */
238
- QueuePosition tail ; /* the global tail is equivalent to the pos
239
- * of the "slowest" backend */
245
+ QueuePosition tail ; /* the global tail is equivalent to the pos of
246
+ * the "slowest" backend */
240
247
TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
241
248
QueueBackendStatus backend [FLEXIBLE_ARRAY_MEMBER ];
242
249
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -247,6 +254,7 @@ static AsyncQueueControl *asyncQueueControl;
247
254
#define QUEUE_HEAD (asyncQueueControl->head)
248
255
#define QUEUE_TAIL (asyncQueueControl->tail)
249
256
#define QUEUE_BACKEND_PID (i ) (asyncQueueControl->backend[i].pid)
257
+ #define QUEUE_BACKEND_DBOID (i ) (asyncQueueControl->backend[i].dboid)
250
258
#define QUEUE_BACKEND_POS (i ) (asyncQueueControl->backend[i].pos)
251
259
252
260
/*
@@ -461,6 +469,7 @@ AsyncShmemInit(void)
461
469
for (i = 0 ; i <= MaxBackends ; i ++ )
462
470
{
463
471
QUEUE_BACKEND_PID (i ) = InvalidPid ;
472
+ QUEUE_BACKEND_DBOID (i ) = InvalidOid ;
464
473
SET_QUEUE_POS (QUEUE_BACKEND_POS (i ), 0 , 0 );
465
474
}
466
475
}
@@ -907,6 +916,10 @@ AtCommit_Notify(void)
907
916
static void
908
917
Exec_ListenPreCommit (void )
909
918
{
919
+ QueuePosition head ;
920
+ QueuePosition max ;
921
+ int i ;
922
+
910
923
/*
911
924
* Nothing to do if we are already listening to something, nor if we
912
925
* already ran this routine in this transaction.
@@ -934,10 +947,34 @@ Exec_ListenPreCommit(void)
934
947
* over already-committed notifications. This ensures we cannot miss any
935
948
* not-yet-committed notifications. We might get a few more but that
936
949
* doesn't hurt.
950
+ *
951
+ * In some scenarios there might be a lot of committed notifications that
952
+ * have not yet been pruned away (because some backend is being lazy about
953
+ * reading them). To reduce our startup time, we can look at other
954
+ * backends and adopt the maximum "pos" pointer of any backend that's in
955
+ * our database; any notifications it's already advanced over are surely
956
+ * committed and need not be re-examined by us. (We must consider only
957
+ * backends connected to our DB, because others will not have bothered to
958
+ * check committed-ness of notifications in our DB.) But we only bother
959
+ * with that if there's more than a page worth of notifications
960
+ * outstanding, otherwise scanning all the other backends isn't worth it.
961
+ *
962
+ * We need exclusive lock here so we can look at other backends' entries.
937
963
*/
938
- LWLockAcquire (AsyncQueueLock , LW_SHARED );
939
- QUEUE_BACKEND_POS (MyBackendId ) = QUEUE_TAIL ;
964
+ LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
965
+ head = QUEUE_HEAD ;
966
+ max = QUEUE_TAIL ;
967
+ if (QUEUE_POS_PAGE (max ) != QUEUE_POS_PAGE (head ))
968
+ {
969
+ for (i = 1 ; i <= MaxBackends ; i ++ )
970
+ {
971
+ if (QUEUE_BACKEND_DBOID (i ) == MyDatabaseId )
972
+ max = QUEUE_POS_MAX (max , QUEUE_BACKEND_POS (i ));
973
+ }
974
+ }
975
+ QUEUE_BACKEND_POS (MyBackendId ) = max ;
940
976
QUEUE_BACKEND_PID (MyBackendId ) = MyProcPid ;
977
+ QUEUE_BACKEND_DBOID (MyBackendId ) = MyDatabaseId ;
941
978
LWLockRelease (AsyncQueueLock );
942
979
943
980
/* Now we are listed in the global array, so remember we're listening */
@@ -953,7 +990,8 @@ Exec_ListenPreCommit(void)
953
990
*
954
991
* This will also advance the global tail pointer if possible.
955
992
*/
956
- asyncQueueReadAllNotifications ();
993
+ if (!QUEUE_POS_EQUAL (max , head ))
994
+ asyncQueueReadAllNotifications ();
957
995
}
958
996
959
997
/*
@@ -1156,6 +1194,7 @@ asyncQueueUnregister(void)
1156
1194
QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ), QUEUE_TAIL );
1157
1195
/* ... then mark it invalid */
1158
1196
QUEUE_BACKEND_PID (MyBackendId ) = InvalidPid ;
1197
+ QUEUE_BACKEND_DBOID (MyBackendId ) = InvalidOid ;
1159
1198
LWLockRelease (AsyncQueueLock );
1160
1199
1161
1200
/* mark ourselves as no longer listed in the global array */
0 commit comments