Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 8a12ea5

Browse files
committed
[PGPRO-4074] DeleteWaitEvent() for event sets.
Cherry-pick of c82e0d53f5. Note: I've blindly adapted to kqueue, need to test it. tags: multimaster, connpool. (cherry picked from commit 434471857301439c8b2a48dd1594bc608c65d23a)
1 parent 652f552 commit 8a12ea5

File tree

2 files changed

+157
-43
lines changed

2 files changed

+157
-43
lines changed

src/backend/storage/ipc/latch.c

Lines changed: 155 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -78,18 +78,36 @@
7878
#error "no wait set implementation available"
7979
#endif
8080

81+
/*
82+
* Connection pooler and mtm need to delete events from event set.
83+
* As far as we have too preserve positions of all other events,
84+
* we can not move events. So we have to maintain list of free events.
85+
* But poll/WaitForMultipleObjects manipulates with array of listened events.
86+
* That is why elements in pollfds and handle arrays should be stored without holes
87+
* and we need to maintain mapping between them and WaitEventSet events.
88+
* This mapping is stored in "permutation" array. Also we need backward mapping
89+
* (from event to descriptors array) which is implemented using "index" field of WaitEvent.
90+
*/
91+
8192
/* typedef in latch.h */
8293
struct WaitEventSet
8394
{
8495
int nevents; /* number of registered events */
8596
int nevents_space; /* maximum number of events in this set */
8697

98+
/*
99+
* L1-list of free events linked by "pos" and terminated by -1.
100+
*/
101+
int free_events;
102+
87103
/*
88104
* Array, of nevents_space length, storing the definition of events this
89105
* set is waiting for.
90106
*/
91107
WaitEvent *events;
92108

109+
int *permutation; /* indexes of used events (see comment above) */
110+
93111
/*
94112
* If WL_LATCH_SET is specified in any wait event, latch is a pointer to
95113
* said latch, and latch_pos the offset in the ->events array. This is
@@ -150,9 +168,9 @@ static void WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action
150168
#elif defined(WAIT_USE_KQUEUE)
151169
static void WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events);
152170
#elif defined(WAIT_USE_POLL)
153-
static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event);
171+
static void WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove);
154172
#elif defined(WAIT_USE_WIN32)
155-
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event);
173+
static void WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove);
156174
#endif
157175

158176
static inline int WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
@@ -574,6 +592,7 @@ CreateWaitEventSet(MemoryContext context, int nevents)
574592
*/
575593
sz += MAXALIGN(sizeof(WaitEventSet));
576594
sz += MAXALIGN(sizeof(WaitEvent) * nevents);
595+
sz += MAXALIGN(sizeof(int) * nevents);
577596

578597
#if defined(WAIT_USE_EPOLL)
579598
sz += MAXALIGN(sizeof(struct epoll_event) * nevents);
@@ -594,23 +613,23 @@ CreateWaitEventSet(MemoryContext context, int nevents)
594613
set->events = (WaitEvent *) data;
595614
data += MAXALIGN(sizeof(WaitEvent) * nevents);
596615

616+
set->permutation = (int *) data;
617+
data += MAXALIGN(sizeof(int) * nevents);
618+
597619
#if defined(WAIT_USE_EPOLL)
598620
set->epoll_ret_events = (struct epoll_event *) data;
599-
data += MAXALIGN(sizeof(struct epoll_event) * nevents);
600621
#elif defined(WAIT_USE_KQUEUE)
601622
set->kqueue_ret_events = (struct kevent *) data;
602-
data += MAXALIGN(sizeof(struct kevent) * nevents);
603623
#elif defined(WAIT_USE_POLL)
604624
set->pollfds = (struct pollfd *) data;
605-
data += MAXALIGN(sizeof(struct pollfd) * nevents);
606625
#elif defined(WAIT_USE_WIN32)
607-
set->handles = (HANDLE) data;
608-
data += MAXALIGN(sizeof(HANDLE) * nevents);
626+
set->handles = (HANDLE*) data;
609627
#endif
610628

611629
set->latch = NULL;
612630
set->nevents_space = nevents;
613631
set->exit_on_postmaster_death = false;
632+
set->free_events = -1;
614633

615634
#if defined(WAIT_USE_EPOLL)
616635
if (!AcquireExternalFD())
@@ -702,12 +721,11 @@ FreeWaitEventSet(WaitEventSet *set)
702721
close(set->kqueue_fd);
703722
ReleaseExternalFD();
704723
#elif defined(WAIT_USE_WIN32)
705-
WaitEvent *cur_event;
724+
int i;
706725

707-
for (cur_event = set->events;
708-
cur_event < (set->events + set->nevents);
709-
cur_event++)
726+
for (i = 0; i < set->nevents; i++)
710727
{
728+
WaitEvent* cur_event = &set->events[set->permutation[i]];
711729
if (cur_event->events & WL_LATCH_SET)
712730
{
713731
/* uses the latch's HANDLE */
@@ -720,7 +738,7 @@ FreeWaitEventSet(WaitEventSet *set)
720738
{
721739
/* Clean up the event object we created for the socket */
722740
WSAEventSelect(cur_event->fd, NULL, 0);
723-
WSACloseEvent(set->handles[cur_event->pos + 1]);
741+
WSACloseEvent(set->handles[cur_event->index + 1]);
724742
}
725743
}
726744
#endif
@@ -761,6 +779,7 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
761779
void *user_data)
762780
{
763781
WaitEvent *event;
782+
int free_event;
764783

765784
/* not enough space */
766785
Assert(set->nevents < set->nevents_space);
@@ -790,8 +809,20 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
790809
if (fd == PGINVALID_SOCKET && (events & WL_SOCKET_MASK))
791810
elog(ERROR, "cannot wait on socket event without a socket");
792811

793-
event = &set->events[set->nevents];
794-
event->pos = set->nevents++;
812+
free_event = set->free_events;
813+
if (free_event >= 0)
814+
{
815+
event = &set->events[free_event];
816+
set->free_events = event->pos;
817+
event->pos = free_event;
818+
}
819+
else
820+
{
821+
event = &set->events[set->nevents];
822+
event->pos = set->nevents;
823+
}
824+
set->permutation[set->nevents] = event->pos;
825+
event->index = set->nevents++;
795826
event->fd = fd;
796827
event->events = events;
797828
event->user_data = user_data;
@@ -820,14 +851,54 @@ AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd, Latch *latch,
820851
#elif defined(WAIT_USE_KQUEUE)
821852
WaitEventAdjustKqueue(set, event, 0);
822853
#elif defined(WAIT_USE_POLL)
823-
WaitEventAdjustPoll(set, event);
854+
WaitEventAdjustPoll(set, event, false);
824855
#elif defined(WAIT_USE_WIN32)
825-
WaitEventAdjustWin32(set, event);
856+
WaitEventAdjustWin32(set, event, false);
826857
#endif
827858

828859
return event->pos;
829860
}
830861

862+
/*
863+
* Remove event with specified position in event set.
864+
*
865+
* 'pos' is the id returned by AddWaitEventToSet.
866+
*/
867+
void
868+
DeleteWaitEvent(WaitEventSet *set, int pos)
869+
{
870+
WaitEvent *event;
871+
#if defined(WAIT_USE_KQUEUE)
872+
int old_events;
873+
#endif
874+
875+
Assert(pos < set->nevents_space);
876+
event = &set->events[pos];
877+
878+
#if defined(WAIT_USE_EPOLL)
879+
WaitEventAdjustEpoll(set, event, EPOLL_CTL_DEL);
880+
#elif defined(WAIT_USE_KQUEUE)
881+
old_events = event->events;
882+
event->events = 0;
883+
WaitEventAdjustKqueue(set, event, old_events);
884+
#elif defined(WAIT_USE_POLL)
885+
WaitEventAdjustPoll(set, event, true);
886+
#elif defined(WAIT_USE_WIN32)
887+
WaitEventAdjustWin32(set, event, true);
888+
#endif
889+
if (--set->nevents != 0)
890+
{
891+
set->permutation[event->index] = set->permutation[set->nevents];
892+
set->events[set->permutation[set->nevents]].index = event->index;
893+
}
894+
event->fd = PGINVALID_SOCKET;
895+
event->events = 0;
896+
event->index = -1;
897+
event->pos = set->free_events;
898+
set->free_events = pos;
899+
}
900+
901+
831902
/*
832903
* Change the event mask and, in the WL_LATCH_SET case, the latch associated
833904
* with the WaitEvent.
@@ -842,7 +913,7 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
842913
int old_events;
843914
#endif
844915

845-
Assert(pos < set->nevents);
916+
Assert(pos < set->nevents_space);
846917

847918
event = &set->events[pos];
848919
#if defined(WAIT_USE_KQUEUE)
@@ -884,9 +955,9 @@ ModifyWaitEvent(WaitEventSet *set, int pos, uint32 events, Latch *latch)
884955
#elif defined(WAIT_USE_KQUEUE)
885956
WaitEventAdjustKqueue(set, event, old_events);
886957
#elif defined(WAIT_USE_POLL)
887-
WaitEventAdjustPoll(set, event);
958+
WaitEventAdjustPoll(set, event, false);
888959
#elif defined(WAIT_USE_WIN32)
889-
WaitEventAdjustWin32(set, event);
960+
WaitEventAdjustWin32(set, event, false);
890961
#endif
891962
}
892963

@@ -933,7 +1004,20 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
9331004
*/
9341005
rc = epoll_ctl(set->epoll_fd, action, event->fd, &epoll_ev);
9351006

936-
if (rc < 0)
1007+
/*
1008+
* Skip throwing error in case of EPOLL_CTL_DEL. Upon connection error
1009+
* libpq may or may not close the socket, so epfd can disappear.
1010+
*
1011+
* XXX it is not entirely clear which errnos should be checked
1012+
* here. According to the mans I would say it is 'EBADF' (closed socket is
1013+
* not valid, right?), any simple test on my 5.1.11 debian agrees with
1014+
* that. However, msvs-6-3 bf machine with 2.6.32 spits out ENOENT (under
1015+
* dmq) despite evidently correct usage (we don't DEL the same fd
1016+
* twice). EINVAL was also historically checked here.
1017+
*/
1018+
if (rc < 0 &&
1019+
!(action == EPOLL_CTL_DEL &&
1020+
(errno == EBADF || errno == EINVAL || errno == ENOENT)))
9371021
ereport(ERROR,
9381022
(errcode_for_socket_access(),
9391023
/* translator: %s is a syscall name, such as "poll()" */
@@ -944,11 +1028,16 @@ WaitEventAdjustEpoll(WaitEventSet *set, WaitEvent *event, int action)
9441028

9451029
#if defined(WAIT_USE_POLL)
9461030
static void
947-
WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event)
1031+
WaitEventAdjustPoll(WaitEventSet *set, WaitEvent *event, bool remove)
9481032
{
949-
struct pollfd *pollfd = &set->pollfds[event->pos];
1033+
struct pollfd *pollfd = &set->pollfds[event->index];
1034+
1035+
if (remove)
1036+
{
1037+
*pollfd = set->pollfds[set->nevents - 1]; /* nevents is not decremented yet */
1038+
return;
1039+
}
9501040

951-
pollfd->revents = 0;
9521041
pollfd->fd = event->fd;
9531042

9541043
/* prepare pollfd entry once */
@@ -1088,7 +1177,11 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
10881177
if (event->events == WL_POSTMASTER_DEATH &&
10891178
(errno == ESRCH || errno == EACCES))
10901179
set->report_postmaster_not_running = true;
1091-
else
1180+
/*
1181+
* Like in WaitEventAdjustEpoll, don't throw if we are trying to
1182+
* remove already closed socket. FIXME: ensure this check is right.
1183+
*/
1184+
else if (!(event->events == 0 && errno == EBADF ))
10921185
ereport(ERROR,
10931186
(errcode_for_socket_access(),
10941187
/* translator: %s is a syscall name, such as "poll()" */
@@ -1112,9 +1205,21 @@ WaitEventAdjustKqueue(WaitEventSet *set, WaitEvent *event, int old_events)
11121205

11131206
#if defined(WAIT_USE_WIN32)
11141207
static void
1115-
WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event)
1208+
WaitEventAdjustWin32(WaitEventSet *set, WaitEvent *event, bool remove)
11161209
{
1117-
HANDLE *handle = &set->handles[event->pos + 1];
1210+
HANDLE *handle = &set->handles[event->index + 1];
1211+
1212+
if (remove)
1213+
{
1214+
Assert(event->fd != PGINVALID_SOCKET);
1215+
1216+
if (*handle != WSA_INVALID_EVENT)
1217+
WSACloseEvent(*handle);
1218+
1219+
*handle = set->handles[set->nevents]; /* nevents is not decremented yet but we need to add 1 to the index */
1220+
set->handles[set->nevents] = WSA_INVALID_EVENT;
1221+
return;
1222+
}
11181223

11191224
if (event->events == WL_LATCH_SET)
11201225
{
@@ -1562,11 +1667,12 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
15621667
{
15631668
int returned_events = 0;
15641669
int rc;
1565-
WaitEvent *cur_event;
1566-
struct pollfd *cur_pollfd;
1670+
int i;
1671+
struct pollfd *cur_pollfd = set->pollfds;
1672+
WaitEvent* cur_event;
15671673

15681674
/* Sleep */
1569-
rc = poll(set->pollfds, set->nevents, (int) cur_timeout);
1675+
rc = poll(cur_pollfd, set->nevents, (int) cur_timeout);
15701676

15711677
/* Check return code */
15721678
if (rc < 0)
@@ -1589,15 +1695,13 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
15891695
return -1;
15901696
}
15911697

1592-
for (cur_event = set->events, cur_pollfd = set->pollfds;
1593-
cur_event < (set->events + set->nevents) &&
1594-
returned_events < nevents;
1595-
cur_event++, cur_pollfd++)
1698+
for (i = 0; i < set->nevents && returned_events < nevents; i++, cur_pollfd++)
15961699
{
15971700
/* no activity on this FD, skip */
15981701
if (cur_pollfd->revents == 0)
15991702
continue;
16001703

1704+
cur_event = &set->events[set->permutation[i]];
16011705
occurred_events->pos = cur_event->pos;
16021706
occurred_events->user_data = cur_event->user_data;
16031707
occurred_events->events = 0;
@@ -1688,17 +1792,25 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
16881792
WaitEvent *occurred_events, int nevents)
16891793
{
16901794
int returned_events = 0;
1795+
int i;
16911796
DWORD rc;
1692-
WaitEvent *cur_event;
1797+
WaitEvent* cur_event;
16931798

16941799
/* Reset any wait events that need it */
1695-
for (cur_event = set->events;
1696-
cur_event < (set->events + set->nevents);
1697-
cur_event++)
1698-
{
1699-
if (cur_event->reset)
1700-
{
1701-
WaitEventAdjustWin32(set, cur_event);
1800+
for (i = 0; i < set->nevents; i++)
1801+
{
1802+
cur_event = &set->events[set->permutation[i]];
1803+
1804+
/*
1805+
* I have problem at Windows when SSPI connections "hanged" in WaitForMultipleObjects which
1806+
* doesn't signal presence of input data (while it is possible to read this data from the socket).
1807+
* Looks like "reset" logic is not completely correct (resetting event just after
1808+
* receiveing presious read event). Reseting all read events fixes this problem.
1809+
*/
1810+
if (cur_event->events & WL_SOCKET_READABLE)
1811+
/* if (cur_event->reset) */
1812+
{
1813+
WaitEventAdjustWin32(set, cur_event, false);
17021814
cur_event->reset = false;
17031815
}
17041816

@@ -1764,7 +1876,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
17641876
* With an offset of one, due to the always present pgwin32_signal_event,
17651877
* the handle offset directly corresponds to a wait event.
17661878
*/
1767-
cur_event = (WaitEvent *) &set->events[rc - WAIT_OBJECT_0 - 1];
1879+
cur_event = (WaitEvent *) &set->events[set->permutation[rc - WAIT_OBJECT_0 - 1]];
17681880

17691881
occurred_events->pos = cur_event->pos;
17701882
occurred_events->user_data = cur_event->user_data;
@@ -1805,7 +1917,7 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
18051917
else if (cur_event->events & WL_SOCKET_MASK)
18061918
{
18071919
WSANETWORKEVENTS resEvents;
1808-
HANDLE handle = set->handles[cur_event->pos + 1];
1920+
HANDLE handle = set->handles[cur_event->index + 1];
18091921

18101922
Assert(cur_event->fd);
18111923

0 commit comments

Comments
 (0)