Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 29d51e1

Browse files
committed
dmq integration: fix PQconnectPoll check sequence according to documentation
1 parent 3f4d528 commit 29d51e1

File tree

1 file changed

+28
-4
lines changed

1 file changed

+28
-4
lines changed

dmq.c

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
* function that will switch fe/be protocol to a COPY mode and enters endless
1818
* receiving loop.
1919
*
20+
* XXX: needs better PQerror reporting logic -- perhaps once per given Idle
21+
* connection.
22+
*
23+
* XXX: is there max size for a connstr?
2024
*
2125
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
2226
* Portions Copyright (c) 1994, Regents of the University of California
@@ -457,6 +461,10 @@ dmq_sender_main(Datum main_arg)
457461

458462
/*
459463
* Generate timeout or socket events.
464+
*
465+
*
466+
* XXX: here we expect that whole cyle takes less then 250-100 ms.
467+
* Otherwise we can stuck with timer_event forever.
460468
*/
461469
now_millisec = dmq_now();
462470
if (now_millisec - prev_timer_at > 250)
@@ -466,7 +474,7 @@ dmq_sender_main(Datum main_arg)
466474
}
467475
else
468476
{
469-
nevents = WaitEventSetWait(set, wait ? 250 : 0, &event,
477+
nevents = WaitEventSetWait(set, wait ? 100 : 0, &event,
470478
1, PG_WAIT_EXTENSION);
471479
}
472480

@@ -506,8 +514,9 @@ dmq_sender_main(Datum main_arg)
506514
NULL, (void *) conn_id);
507515

508516
mtm_log(DmqStateIntermediate,
509-
"[DMQ] switching %s from Idle to Connecting",
510-
conns[conn_id].receiver_name);
517+
"[DMQ] switching %s from Idle to Connecting on '%s'",
518+
conns[conn_id].receiver_name,
519+
conns[conn_id].connstr);
511520
}
512521
}
513522
/* Heatbeat */
@@ -547,9 +556,24 @@ dmq_sender_main(Datum main_arg)
547556
{
548557
PostgresPollingStatusType status = PQconnectPoll(conns[conn_id].pgconn);
549558

559+
mtm_log(DmqStateIntermediate,
560+
"[DMQ] Connecting: PostgresPollingStatusType = %d on %s",
561+
status,
562+
conns[conn_id].receiver_name);
563+
550564
if (status == PGRES_POLLING_READING)
551565
{
552566
ModifyWaitEvent(set, event.pos, WL_SOCKET_READABLE, NULL);
567+
mtm_log(DmqStateIntermediate,
568+
"[DMQ] Connecting: modify wait event to WL_SOCKET_READABLE on %s",
569+
conns[conn_id].receiver_name);
570+
}
571+
else if (status == PGRES_POLLING_WRITING)
572+
{
573+
ModifyWaitEvent(set, event.pos, WL_SOCKET_WRITEABLE, NULL);
574+
mtm_log(DmqStateIntermediate,
575+
"[DMQ] Connecting: modify wait event to WL_SOCKET_WRITEABLE on %s",
576+
conns[conn_id].receiver_name);
553577
}
554578
else if (status == PGRES_POLLING_OK)
555579
{
@@ -577,7 +601,7 @@ dmq_sender_main(Datum main_arg)
577601
PQerrorMessage(conns[conn_id].pgconn));
578602
}
579603
else
580-
Assert(status == PGRES_POLLING_WRITING);
604+
Assert(false);
581605

582606
break;
583607
}

0 commit comments

Comments
 (0)