51
51
#define DMQ_MQ_MAGIC 0x646d71
52
52
53
53
/* XXX: move to common */
54
- #define BIT_CHECK (mask , bit ) (((mask) & ((int64)1 << (bit))) != 0)
54
+ #define BIT_CLEAR (mask , bit ) ((mask) &= ~((uint64)1 << (bit)))
55
+ #define BIT_CHECK (mask , bit ) (((mask) & ((uint64)1 << (bit))) != 0)
55
56
56
57
/*
57
58
* Shared data structures to hold current connections topology.
@@ -83,6 +84,7 @@ typedef struct
83
84
PGconn * pgconn ;
84
85
DmqConnState state ;
85
86
int pos ;
87
+ int8 mask_pos ;
86
88
} DmqDestination ;
87
89
88
90
typedef struct
@@ -101,6 +103,43 @@ struct DmqSharedState
101
103
pid_t sender_pid ;
102
104
dsm_handle out_dsm ;
103
105
DmqDestination destinations [DMQ_MAX_DESTINATIONS ];
106
+ /*
107
+ * Stores counters incremented on each reconnect to destination, indexed
108
+ * by receiver mask_pos. This allows to detect conn failures to avoid
109
+ * infinite waiting for response when request could have been dropped,
110
+ * c.f. dmq_fill_sconn_cnt and dmq_purge_failed_participants.
111
+ *
112
+ * XXX This mechanism is unreliable and ugly.
113
+ * Unreliable, because though it saves from infinite waiting for reply, it
114
+ * doesn't save from potential deadlocks. Deadlocks may arise whenever a
115
+ * loop of nodes makes request-response sequences because request A->B and
116
+ * A's response to B's request go via the same TCP channel; thus, if all
117
+ * queues of loop in one direction are filled with requests, nobody will
118
+ * be able to answer.
119
+ *
120
+ * We could control the situation by ensuring 1) all possible requests
121
+ * node sends at time could fit in the output buffers 2) node never
122
+ * repeats the request until the previous one is delivered or dropped.
123
+ * However, to honor the 2), we must terminate send connection whenever
124
+ * receive conn failed (and thus we gonna to retry the requests) to flush
125
+ * previous possible undelivered requests, which we don't do currently.
126
+ *
127
+ * Ultimate non-deadlockable solution without such hacks would be to
128
+ * divide the job between different channels: A sends its requests to B
129
+ * and recevies responses from it via one TCP channel, and B sends its
130
+ * requests to A and receives responses via another one. Probably this is
131
+ * not worthwhile though as it would make dmq more complicated and
132
+ * increase number of shm_mqs.
133
+ *
134
+ * Besides, the counters are ugly because they require the external code
135
+ * to remember sender counters before request and check them while
136
+ * waiting for reply; moreover, this checking must be based on timouts
137
+ * as nobody would wake the clients on send conn failures.
138
+ *
139
+ * No locks are used because we don't care much about correct/up-to-date
140
+ * reads, though well aligned ints are atomic anyway.
141
+ */
142
+ volatile int sconn_cnt [DMQ_MAX_DESTINATIONS ];
104
143
105
144
/* receivers stuff */
106
145
int n_receivers ;
@@ -115,6 +154,9 @@ struct DmqSharedState
115
154
116
155
} * dmq_state ;
117
156
157
+ /* special value for sconn_cnt[] meaning the connection is dead */
158
+ #define DMQSCONN_DEAD 0
159
+
118
160
static HTAB * dmq_subscriptions ;
119
161
120
162
/* Backend-local i/o queues. */
@@ -332,6 +374,11 @@ dmq_sender_main(Datum main_arg)
332
374
WaitEventSet * set ;
333
375
DmqDestination conns [DMQ_MAX_DESTINATIONS ];
334
376
int heartbeat_send_timeout = DatumGetInt32 (main_arg );
377
+ /*
378
+ * Seconds dmq_state->sconn_cnt to save the counter value when
379
+ * conn is dead.
380
+ */
381
+ int sconn_cnt [DMQ_MAX_DESTINATIONS ];
335
382
336
383
double prev_timer_at = dmq_now ();
337
384
@@ -403,6 +450,8 @@ dmq_sender_main(Datum main_arg)
403
450
conns [i ] = * dest ;
404
451
Assert (conns [i ].pgconn == NULL );
405
452
conns [i ].state = Idle ;
453
+ sconn_cnt [dest -> mask_pos ] = 0 ;
454
+ dmq_state -> sconn_cnt [dest -> mask_pos ] = DMQSCONN_DEAD ;
406
455
prev_timer_at = 0 ; /* do not wait for timer event */
407
456
}
408
457
/* close connection to deleted destination */
@@ -443,6 +492,7 @@ dmq_sender_main(Datum main_arg)
443
492
if (ret < 0 )
444
493
{
445
494
conns [conn_id ].state = Idle ;
495
+ dmq_state -> sconn_cnt [conns [conn_id ].mask_pos ] = DMQSCONN_DEAD ;
446
496
447
497
mtm_log (DmqStateFinal ,
448
498
"[DMQ] failed to send message to %s: %s" ,
@@ -561,6 +611,7 @@ dmq_sender_main(Datum main_arg)
561
611
if (ret < 0 )
562
612
{
563
613
conns [conn_id ].state = Idle ;
614
+ dmq_state -> sconn_cnt [conns [conn_id ].mask_pos ] = DMQSCONN_DEAD ;
564
615
565
616
mtm_log (DmqStateFinal ,
566
617
"[DMQ] failed to send heartbeat to %s: %s" ,
@@ -684,9 +735,13 @@ dmq_sender_main(Datum main_arg)
684
735
}
685
736
if (!PQisBusy (conns [conn_id ].pgconn ))
686
737
{
738
+ int8 mask_pos = conns [conn_id ].mask_pos ;
739
+
687
740
conns [conn_id ].state = Active ;
688
741
DeleteWaitEvent (set , event .pos );
689
742
PQsetnonblocking (conns [conn_id ].pgconn , 1 );
743
+ sconn_cnt [mask_pos ]++ ;
744
+ dmq_state -> sconn_cnt [mask_pos ] = sconn_cnt [mask_pos ];
690
745
691
746
mtm_log (DmqStateFinal ,
692
747
"[DMQ] Connected to %s" ,
@@ -702,6 +757,7 @@ dmq_sender_main(Datum main_arg)
702
757
if (!PQconsumeInput (conns [conn_id ].pgconn ))
703
758
{
704
759
conns [conn_id ].state = Idle ;
760
+ dmq_state -> sconn_cnt [conns [conn_id ].mask_pos ] = DMQSCONN_DEAD ;
705
761
706
762
mtm_log (DmqStateFinal ,
707
763
"[DMQ] connection error with %s: %s" ,
@@ -1444,6 +1500,9 @@ dmq_detach_receiver(char *sender_name)
1444
1500
dmq_local .inhandles [handle_id ].name [0 ] = '\0' ;
1445
1501
}
1446
1502
1503
+ /*
1504
+ * Subscribes caller to msgs from stream_name.
1505
+ */
1447
1506
void
1448
1507
dmq_stream_subscribe (char * stream_name )
1449
1508
{
@@ -1503,6 +1562,24 @@ dmq_stream_unsubscribe(char *stream_name)
1503
1562
Assert (found );
1504
1563
}
1505
1564
1565
+ /*
1566
+ * Fills (preallocated) sconn_cnt with current values of sender
1567
+ * connection counters to guys in participants mask (as registered in
1568
+ * dmq_destination_add), which allows to reveal connection failures, possibly
1569
+ * resulted in losing request -- and thus stop hopeless waiting for response.
1570
+ */
1571
+ void
1572
+ dmq_get_sendconn_cnt (uint64 participants , int * sconn_cnt )
1573
+ {
1574
+ int i ;
1575
+
1576
+ for (i = 0 ; i < DMQ_N_MASK_POS ; i ++ )
1577
+ {
1578
+ if (BIT_CHECK (participants , i ))
1579
+ sconn_cnt [i ] = dmq_state -> sconn_cnt [i ];
1580
+ }
1581
+ }
1582
+
1506
1583
bool
1507
1584
dmq_pop (int8 * sender_mask_pos , StringInfo msg , uint64 mask )
1508
1585
{
@@ -1639,9 +1716,36 @@ dmq_pop_nb(int8 *sender_mask_pos, StringInfo msg, uint64 mask, bool *wait)
1639
1716
return false;
1640
1717
}
1641
1718
1719
+ /*
1720
+ * Accepts bitmask of participants and sconn_cnt counters with send
1721
+ * connections counters as passed to (and the latter filled by)
1722
+ * dmq_fill_sconn_cnt, returns this mask after unsetting bits for those
1723
+ * counterparties with whom we've lost the send connection since.
1724
+ */
1725
+ uint64
1726
+ dmq_purge_failed_participants (uint64 participants , int * sconn_cnt )
1727
+ {
1728
+ int i ;
1729
+ uint64 res = participants ;
1730
+
1731
+ for (i = 0 ; i < DMQ_N_MASK_POS ; i ++ )
1732
+ {
1733
+ if (BIT_CHECK (participants , i ) &&
1734
+ (sconn_cnt [i ] == DMQSCONN_DEAD ||
1735
+ sconn_cnt [i ] != dmq_state -> sconn_cnt [i ]))
1736
+ BIT_CLEAR (res , i );
1737
+ }
1738
+ return res ;
1739
+ }
1740
+
1741
+ /*
1742
+ * recv_mask_pos is short (< DMQ_N_MASK_POS) variant of
1743
+ * receiver_name, used to track connection failures -- it must match mask_pos
1744
+ * in dmq_attach_receiver to work!
1745
+ */
1642
1746
DmqDestinationId
1643
1747
dmq_destination_add (char * connstr , char * sender_name , char * receiver_name ,
1644
- int recv_timeout )
1748
+ int8 recv_mask_pos , int recv_timeout )
1645
1749
{
1646
1750
DmqDestinationId dest_id ;
1647
1751
pid_t sender_pid ;
@@ -1658,6 +1762,7 @@ dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
1658
1762
strncpy (dest -> connstr , connstr , DMQ_CONNSTR_MAX_LEN );
1659
1763
dest -> recv_timeout = recv_timeout ;
1660
1764
dest -> active = true;
1765
+ dest -> mask_pos = recv_mask_pos ;
1661
1766
break ;
1662
1767
}
1663
1768
}
0 commit comments