51
51
#include "multimaster.h"
52
52
53
53
#define MAX_CONNECT_ATTEMPTS 10
54
- #define TX_BUFFER_SIZE 1024
54
+ #define BUFFER_SIZE 1024
55
+ #define BUFFER_SIZE 1024
55
56
56
57
typedef struct
57
58
{
@@ -61,12 +62,11 @@ typedef struct
61
62
62
63
typedef struct
63
64
{
64
- DtmCommitMessage data [TX_BUFFER_SIZE ];
65
+ DtmCommitMessage data [BUFFER_SIZE ];
65
66
int used ;
66
- } DtmTxBuffer ;
67
+ } DtmBuffer ;
67
68
68
69
static int * sockets ;
69
- static DtmTxBuffer * txBuffers ;
70
70
71
71
static BackgroundWorker DtmSender = {
72
72
"mm-sender" ,
@@ -115,6 +115,34 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
115
115
return 1 ;
116
116
}
117
117
118
+ #ifdef USE_EPOLL
119
+ static int epollfd ;
120
+ #else
121
+ static int max_fd ;
122
+ static fd_set inset ;
123
+ #endif
124
+
125
+ inline void registerSocket (int fd , int i )
126
+ {
127
+ #ifdef USE_EPOLL
128
+ struct epoll_event ev ;
129
+ ev .events = EPOLLIN ;
130
+ ev .data .u32 = i ;
131
+ if (epoll_ctl (epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
132
+ char buf [ERR_BUF_SIZE ];
133
+ sprintf (buf , "Failed to add socket %d to epoll set" , fd );
134
+ shub -> params -> error_handler (buf , SHUB_FATAL_ERROR );
135
+ }
136
+ #else
137
+ FD_SET (fd , & inset );
138
+ if (fd > max_fd ) {
139
+ max_fd = fd ;
140
+ }
141
+ #endif
142
+ }
143
+
144
+
145
+
118
146
static int connectSocket (char const * host , int port )
119
147
{
120
148
struct sockaddr_in sock_inet ;
@@ -206,20 +234,47 @@ static void acceptConnections()
206
234
elog (ERROR , "Failed to bind socket: %d" , errno );
207
235
}
208
236
209
- for (i = 0 ; i < nNodes - 1 ; i ++ ) {
210
- sockets [ i ] = accept (sd , NULL , NULL );
211
- if (sockets [ i ] < 0 ) {
237
+ for (i = 0 ; i < nNodes ; i ++ ) {
238
+ int fd = accept (sd , NULL , NULL );
239
+ if (fd < 0 ) {
212
240
elog (ERROR , "Failed to accept socket: %d" , errno );
213
241
}
242
+ registerSocket (fd , i );
243
+ sockets [i ] = fd ;
214
244
}
215
245
}
216
246
247
+ static void WriteSocket (int sd , void const * buf , int size )
248
+ {
249
+ char * src = (char * )buf ;
250
+ while (size != 0 ) {
251
+ int n = send (sd , src , size , 0 );
252
+ if (n <= 0 ) {
253
+ return 0 ;
254
+ }
255
+ size -= n ;
256
+ src += n ;
257
+ }
258
+ }
259
+
260
+ static int ReadSocket (int sd , void * buf , int buf_size )
261
+ {
262
+ int rc = recv (sd , buf , buf_size , 0 );
263
+ if (rc <= 0 ) {
264
+ elog (ERROR , "Arbiter failed to read socket: %d" , rc );
265
+ }
266
+ return rc ;
267
+ }
268
+
269
+
217
270
static void DtmTransSender (Datum arg )
218
271
{
219
272
int nNodes = dtm -> nNodes ;
220
273
int i ;
221
- DtmCommitMessage * txBuffer = (DtmCommitMessage * )palloc (sizeof (DtmTxBuffer )* ( nNodes ) );
274
+ DtmTxBuffer * txBuffer = (DtmTxBuffer * )palloc (sizeof (DtmTxBuffer )* nNodes );
222
275
276
+ sockets = (int * )palloc (sizeof (int )* nNodes );
277
+
223
278
openConnections ();
224
279
225
280
for (i = 0 ; i < nNodes ; i ++ ) {
@@ -229,31 +284,106 @@ static void DtmTransSender(Datum arg)
229
284
while (true) {
230
285
DtmTransState * ts ;
231
286
PGSemaphoreLock (& dtm -> semphore );
287
+ CHECK_FOR_INTERRUPTS ();
232
288
233
289
SpinLockAcquire (& dtm -> spinlock );
234
290
ts = dtm -> pendingTransactions ;
235
291
dtm -> pendingTransactions = NULL ;
236
292
SpinLockRelease (& dtm -> spinlock );
237
293
238
- for (ts = dtm -> pendingTransactions ; ts != NULL ; ts = ts -> nextPending ) {
239
- int node = ts -> gtid .node ;
240
- Assert (node != MMNodeId );
241
- node -= 1 ;
242
- if (txBuffer [node ].used == TX_BUFFER_SIZE ) {
243
- WriteSocket (sockets [node ], txBuffer [node ].data , txBuffer [node ].used * sizeof (DtmCommitRequest ));
244
- txBuffer [node ].used = 0 ;
294
+ for (; ts != NULL ; ts = ts -> nextPending ) {
295
+ i = ts -> gtid .node - 1 ;
296
+ Assert (i != MMNodeId );
297
+ if (txBuffer [i ].used == BUFFER_SIZE ) {
298
+ WriteSocket (sockets [i ], txBuffer [i ].data , txBuffer [i ].used * sizeof (DtmCommitRequest ));
299
+ txBuffer [i ].used = 0 ;
245
300
}
246
- txBuffer [node ].data [txBuffer [node ].used ].xid = ts -> xid ;
247
- txBuffer [node ].data [txBuffer [node ].used ].csn = ts -> csn ;
248
- txBuffer [node ].used += 1 ;
301
+ txBuffer [i ].data [txBuffer [i ].used ].xid = ts -> xid ;
302
+ txBuffer [i ].data [txBuffer [i ].used ].csn = ts -> csn ;
303
+ txBuffer [i ].used += 1 ;
249
304
}
250
- dtm -> pendingTransactions = NULL ;
251
-
305
+ for (i = 0 ; i < nNodes ; i ++ ) {
306
+ if (txBuffer [i ].used != 0 ) {
307
+ WriteSocket (sockets [i ], txBuffer [i ].data , txBuffer [i ].used * sizeof (DtmCommitRequest ));
308
+ txBuffer [i ].used = 0 ;
309
+ }
310
+ }
252
311
}
253
312
}
254
313
255
314
static void DtmTransReceiver (Datum arg )
256
315
{
316
+ int nNodes = dtm -> nNodes - 1 ;
317
+ int i , j , rc ;
318
+ int rxBufPos = 0 ;
319
+ DtmBuffer * rxBuffer = (DtmBuffer * )palloc (sizeof (DtmBuffer )* nNodes );
320
+ HTAB * xid2state ;
321
+
322
+ #ifdef USE_EPOLL
323
+ struct epoll_event * events = (struct epoll_event * )palloc (SIZEOF (struct epoll_event )* nNodes );
324
+ epollfd = epoll_create (nNodes );
325
+ #else
326
+ FD_ZERO (& inset );
327
+ max_fd = 0 ;
328
+ #endif
329
+
257
330
acceptConnections ();
331
+ xid2state = MMCreateHash ();
332
+
333
+ for (i = 0 ; i < nNodes ; i ++ ) {
334
+ txBuffer [i ].used = 0 ;
335
+ }
336
+
337
+ while (true) {
338
+ #ifdef USE_EPOLL
339
+ rc = epoll_wait (epollfd , events , MAX_EVENTS , shub -> in_buffer_used == 0 ? -1 : shub -> params -> delay );
340
+ if (rc < 0 ) {
341
+ elog (ERROR , "epoll failed: %d" , errno );
342
+ }
343
+ for (j = 0 ; j < rc ; j ++ ) {
344
+ i = events [j ].data .u32 ;
345
+ if (events [j ].events & EPOLLERR ) {
346
+ struct sockaddr_in insock ;
347
+ socklen_t len = sizeof (insock );
348
+ getpeername (fd , (struct sockaddr * )& insock , & len );
349
+ elog (WARNING , "Loose connection with %s" , inet_ntoa (insock .sin_addr_ ));
350
+ epoll_ctl (epollfd , EPOLL_CTL_DEL , fd , NULL );
351
+ }
352
+ else if (events [j ].events & EPOLLIN )
353
+ #else
354
+ fd_set events ;
355
+ events = inset ;
356
+ rc = select (max_fd + 1 , & events , NULL , NULL , NULL );
357
+ if (rc < 0 ) {
358
+ elog (ERROR , "select failed: %d" , errno );
359
+ }
360
+ for (i = 0 ; i < nNodes ; i ++ ) {
361
+ if (FD_ISSET (sockets [i ], & events ))
362
+ #endif
363
+ {
364
+ int nResponses ;
365
+ rxBuffer [i ].used += ReadSocket (sockets [i ], (char * )rxBuffer [i ].data + rxBuffer [i ].used , RX_BUFFER_SIZE - rxBufPos );
366
+ nResponses = rxBuffer [i ].used /sizeof (DtmCommitRequest );
367
+
368
+ LWLockAcquire (& dtm -> hashLock , LW_SHARED );
369
+
370
+ for (j = 0 ; j < nResponses ; j ++ ) {
371
+ DtmCommitRequest * req = & rxBuffer [i ].data [j ];
372
+ DtmTransState * ts = (DtmTransState * )hash_search (xid2state , & req -> xid , HASH_FIND , NULL );
373
+ Assert (ts != NULL );
374
+ if (req -> csn > ts -> csn ) {
375
+ ts -> csn = req -> csn ;
376
+ }
377
+ if (ts -> nVotes == dtm -> nNodes - 1 ) {
378
+ SetLatch (& ProcGlobal -> allProcs [ts -> pid ].procLatch );
379
+ }
380
+ }
381
+ if (rxBuffer [i ].used != nResponses * sizeof (DtmCommitRequest )) {
382
+ rxBuffer [i ].used -= nResponses * sizeof (DtmCommitRequest );
383
+ memmove (rxBuffer [i ].data , (char * )rxBuffer [i ].data + nResponses * sizeof (DtmCommitRequest ), rxBuffer [i ].used );
384
+ }
385
+ }
386
+ }
387
+ }
258
388
}
259
389
0 commit comments