16
16
#include "shard.h"
17
17
#include "timeutils.h"
18
18
19
+ /* epoll max events */
20
+ #define MAX_EVENTS 64
21
+
19
22
typedef enum
20
23
{
21
24
MOVEMPART_IN_PROGRESS ,
@@ -55,7 +58,7 @@ typedef struct
55
58
static void init_mmp_state (MoveMPartState * mmps , const char * part_name ,
56
59
int32 dst_node );
57
60
static void move_mparts (MoveMPartState * mmpss , int nparts );
58
- static int calc_timeout (slist_head * timeout_states );
61
+ static int calc_timeout (struct timespec waketm , bool waketm_set );
59
62
static ExecMoveMPartRes exec_move_mpart (MoveMPartState * mmps );
60
63
61
64
/*
@@ -260,18 +263,28 @@ move_mparts(MoveMPartState *mmpss, int nparts)
260
263
/* list of sleeping mmp states we need to wake after specified timeout */
261
264
slist_head timeout_states = SLIST_STATIC_INIT (timeout_states );
262
265
slist_iter iter ;
263
-
264
- int timeout ; /* at least one task will be ready after timeout millis */
266
+ /* at least one task will require our attention at waketm */
267
+ struct timespec waketm ;
268
+ /* Yes, we could use field of waketm for that. */
269
+ bool waketm_set ;
270
+ int timeout ;
265
271
int unfinished_moves = 0 ; /* number of not yet failed or succeeded tasks */
266
272
int i ;
267
273
int e ;
268
274
int epfd ;
275
+ struct epoll_event evlist [MAX_EVENTS ];
269
276
277
+ /* In the beginning, all tasks are ready for execution, so wake tm is right
278
+ * is actually current time. We also need to put all tasks to the
279
+ * timeout_states list to invoke them.
280
+ */
281
+ if ((e = clock_gettime (CLOCK_MONOTONIC , & waketm )) == -1 )
282
+ shmn_elog (FATAL , "clock_gettime failed, %s" , strerror (e ));
283
+ waketm_set = true;
270
284
for (i = 0 ; i < nparts ; i ++ )
271
285
{
272
286
if (mmpss [i ].result != MOVEMPART_FAILED )
273
287
{
274
- /* In the beginning, all tasks are ready immediately */
275
288
MoveMPartStateNode * mmps_node = palloc (sizeof (MoveMPartStateNode ));
276
289
elog (DEBUG4 , "Adding task %s to timeout list" , mmpss [i ].part_name );
277
290
mmps_node -> mmps = & mmpss [i ];
@@ -285,50 +298,46 @@ move_mparts(MoveMPartState *mmpss, int nparts)
285
298
shmn_elog (FATAL , "epoll_create1 failed" );
286
299
}
287
300
301
+ /* TODO: check for signals */
288
302
while (unfinished_moves > 0 )
289
303
{
290
- timeout = calc_timeout (& timeout_states );
304
+ timeout = calc_timeout (waketm , waketm_set );
305
+ e = epoll_wait (epfd , evlist , MAX_EVENTS , timeout );
306
+ if (e == -1 )
307
+ {
308
+ if (errno == EINTR )
309
+ continue ;
310
+ else
311
+ shmn_elog (FATAL , "epoll_wait failed, %s" , strerror (e ));
312
+ }
291
313
unfinished_moves -- ;
292
314
}
293
315
}
294
316
295
- /* Calculate when we need to wake if no epoll events are happening */
317
+ /*
318
+ * Calculate when we need to wake if no epoll events are happening.
319
+ * Returned value is ready for epoll_wait.
320
+ */
296
321
int
297
- calc_timeout (slist_head * timeout_states )
322
+ calc_timeout (struct timespec waketm , bool waketm_set )
298
323
{
299
- slist_iter iter ;
300
- struct timespec curtm ;
301
324
int e ;
302
- int timeout = -1 ; /* If no tasks wait for us, don't wake */
325
+ struct timespec curtm ;
326
+ int timeout ;
303
327
304
- slist_foreach (iter , timeout_states )
305
- {
306
- MoveMPartStateNode * mmps_node =
307
- slist_container (MoveMPartStateNode , list_node , iter .cur );
308
- MoveMPartState * mmps = mmps_node -> mmps ;
309
- shmn_elog (DEBUG1 , "Peeking into %s task wake time" , mmps -> part_name );
310
- if ((e = clock_gettime (CLOCK_MONOTONIC , & curtm )) == -1 )
311
- {
328
+ if (!waketm_set )
329
+ return -1 ;
330
+
331
+ if ((e = clock_gettime (CLOCK_MONOTONIC , & curtm )) == -1 )
312
332
shmn_elog (FATAL , "clock_gettime failed, %s" , strerror (e ));
313
- }
314
- if (timespeccmp (curtm , mmps -> waketm ) >= 0 )
315
- {
316
- shmn_elog (DEBUG1 , "Task %s is already ready" , mmps -> part_name );
317
- timeout = 0 ;
318
- return timeout ;
319
- }
320
- else
321
- {
322
- int diff = Max (0 , timespec_diff_millis (mmps -> waketm , curtm ));
323
- if (timeout == -1 )
324
- timeout = diff ;
325
- else
326
- timeout = Min (timeout , diff );
327
- shmn_elog (DEBUG1 , "Timeout set to %d due to task %s " ,
328
- timeout , mmps -> part_name );
329
- }
333
+ if (timespeccmp (waketm , curtm ) <= 0 )
334
+ {
335
+ shmn_elog (DEBUG1 , "Non-negative timeout, waking immediately" );
336
+ return 0 ;
330
337
}
331
338
339
+ timeout = Max (0 , timespec_diff_millis (waketm , curtm ));
340
+ shmn_elog (DEBUG1 , "Timeout is %d" , timeout );
332
341
return timeout ;
333
342
}
334
343
0 commit comments