Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a6f56a0

Browse files
committed
Main loop for move_mpart seems to be done.
1 parent 09af48e commit a6f56a0

File tree

1 file changed

+92
-5
lines changed

1 file changed

+92
-5
lines changed

src/shard.c

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "libpq-fe.h"
1010
#include "lib/ilist.h"
1111

12+
#include <unistd.h>
1213
#include <time.h>
1314
#include <limits.h>
1415
#include <sys/epoll.h>
@@ -59,6 +60,9 @@ static void init_mmp_state(MoveMPartState *mmps, const char *part_name,
5960
int32 dst_node);
6061
static void move_mparts(MoveMPartState *mmpss, int nparts);
6162
static int calc_timeout(struct timespec waketm, bool waketm_set);
63+
static void update_waketm(struct timespec *waketm, bool *waketm_set,
64+
MoveMPartState *mmps);
65+
static void epoll_subscribe(int epfd, MoveMPartState *mmps);
6266
static ExecMoveMPartRes exec_move_mpart(MoveMPartState *mmps);
6367

6468
/*
@@ -249,6 +253,7 @@ init_mmp_state(MoveMPartState *mmps, const char *part_name, int32 dst_node)
249253
{
250254
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
251255
}
256+
mmps->fd_to_epoll = -1;
252257
mmps->fd_in_epoll_set = -1;
253258

254259
mmps->result = MOVEMPART_IN_PROGRESS;
@@ -262,11 +267,12 @@ move_mparts(MoveMPartState *mmpss, int nparts)
262267
{
263268
/* list of sleeping mmp states we need to wake after specified timeout */
264269
slist_head timeout_states = SLIST_STATIC_INIT(timeout_states);
265-
slist_iter iter;
270+
slist_mutable_iter iter;
266271
/* at least one task will require our attention at waketm */
267272
struct timespec waketm;
268273
/* Yes, we could use field of waketm for that. */
269274
bool waketm_set;
275+
struct timespec curtm;
270276
int timeout;
271277
int unfinished_moves = 0; /* number of not yet failed or succeeded tasks */
272278
int i;
@@ -294,9 +300,7 @@ move_mparts(MoveMPartState *mmpss, int nparts)
294300
}
295301

296302
if ((epfd = epoll_create1(0)) == -1)
297-
{
298303
shmn_elog(FATAL, "epoll_create1 failed");
299-
}
300304

301305
/* TODO: check for signals */
302306
while (unfinished_moves > 0)
@@ -310,8 +314,47 @@ move_mparts(MoveMPartState *mmpss, int nparts)
310314
else
311315
shmn_elog(FATAL, "epoll_wait failed, %s", strerror(e));
312316
}
313-
unfinished_moves--;
317+
318+
/* Run all tasks for which it is time to wake */
319+
waketm_set = false; /* reset waketm */
320+
slist_foreach_modify(iter, &timeout_states)
321+
{
322+
MoveMPartStateNode *mmps_node =
323+
slist_container(MoveMPartStateNode, list_node, iter.cur);
324+
MoveMPartState *mmps = mmps_node->mmps;
325+
if ((e = clock_gettime(CLOCK_MONOTONIC, &curtm)) == -1)
326+
shmn_elog(FATAL, "clock_gettime failed, %s", strerror(e));
327+
328+
if (timespeccmp(mmps->waketm, curtm) <= 0)
329+
{
330+
shmn_elog(DEBUG1, "%s is ready for exec", mmps->part_name);
331+
switch (exec_move_mpart(mmps))
332+
{
333+
case EXECMOVEMPART_WAKEMEUP:
334+
/* We need to wake this task again, update waketm and
335+
* keep it in the list */
336+
update_waketm(&waketm, &waketm_set, mmps);
337+
continue;
338+
339+
case EXECMOVEMPART_EPOLL:
340+
/* Task wants to be wakened by epoll */
341+
epoll_subscribe(epfd, mmps);
342+
break;
343+
344+
case EXECMOVEMPART_DONE:
345+
/* Task is done, decrement the counter */
346+
unfinished_moves--;
347+
break;
348+
}
349+
/* If we are still here, remove node from timeouts_list */
350+
slist_delete_current(&iter);
351+
/* And free memory */
352+
pfree(mmps_node);
353+
}
354+
}
314355
}
356+
357+
close(epfd);
315358
}
316359

317360
/*
@@ -341,12 +384,56 @@ calc_timeout(struct timespec waketm, bool waketm_set)
341384
return timeout;
342385
}
343386

387+
/*
388+
* Update min waketm
389+
*/
390+
void
391+
update_waketm(struct timespec *waketm, bool *waketm_set, MoveMPartState *mmps)
392+
{
393+
if (!(*waketm_set) || timespeccmp(mmps->waketm, *waketm) < 0)
394+
{
395+
shmn_elog(DEBUG1, "Waketm updated, old s %d, new s %d",
396+
(int) waketm->tv_sec, (int) mmps->waketm.tv_sec);
397+
*waketm_set = true;
398+
*waketm = mmps->waketm;
399+
}
400+
}
401+
402+
/*
403+
* Ensure that mmps is registered in epoll and set proper mode.
404+
* We never remove fds from epoll, they should be removed automatically when
405+
* closed.
406+
*/
407+
void
408+
epoll_subscribe(int epfd, MoveMPartState *mmps)
409+
{
410+
struct epoll_event ev;
411+
int e;
412+
413+
ev.data.ptr = mmps;
414+
ev.events = EPOLLIN | EPOLLONESHOT;
415+
Assert(mmps->fd_to_epoll != -1);
416+
if (mmps->fd_to_epoll == mmps->fd_in_epoll_set)
417+
{
418+
if ((e = epoll_ctl(epfd, EPOLL_CTL_MOD, mmps->fd_to_epoll, &ev)) == -1)
419+
shmn_elog(FATAL, "epoll_ctl failed, %s", strerror(e));
420+
}
421+
else
422+
{
423+
if ((e = epoll_ctl(epfd, EPOLL_CTL_ADD, mmps->fd_to_epoll, &ev)) == -1)
424+
shmn_elog(FATAL, "epoll_ctl failed, %s", strerror(e));
425+
mmps->fd_in_epoll_set = mmps->fd_to_epoll;
426+
}
427+
shmn_elog(DEBUG1, "socket for task %s added to epoll", mmps->part_name);
428+
}
429+
344430
/*
345431
* Actually run MoveMPart state machine. Return value says when (if ever)
346432
* we want to be executed again.
347433
*/
348434
ExecMoveMPartRes
349435
exec_move_mpart(MoveMPartState *mmps)
350436
{
351-
437+
shmn_elog(DEBUG1, "Partition %s is moved", mmps->part_name);
438+
return EXECMOVEMPART_DONE;
352439
}

0 commit comments

Comments
 (0)