@@ -125,7 +125,8 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
125
125
if (newbytes == -1 )
126
126
{
127
127
if (errno == EAGAIN ) {
128
- if (poll_until_writable (sock , timeout_ms - msec (now - start ))) {
128
+ int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
129
+ if (poll_until_writable (sock , remaining_ms )) {
129
130
continue ;
130
131
}
131
132
}
@@ -157,7 +158,8 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
157
158
if (newbytes == -1 )
158
159
{
159
160
if (errno == EAGAIN ) {
160
- if (poll_until_readable (sock , timeout_ms - msec (now - start ))) {
161
+ int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
162
+ if (poll_until_readable (sock , remaining_ms )) {
161
163
continue ;
162
164
}
163
165
}
@@ -224,8 +226,9 @@ static bool connect_leader(int timeout_ms)
224
226
while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
225
227
{
226
228
TimestampTz past = now ;
229
+ int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - elapsed_ms ;
227
230
228
- if (poll_until_writable (sd , timeout_ms - elapsed_ms ))
231
+ if (poll_until_writable (sd , remaining_ms ))
229
232
{
230
233
int err ;
231
234
socklen_t optlen = sizeof (err );
@@ -301,7 +304,7 @@ raftable_sql_get(PG_FUNCTION_ARGS)
301
304
302
305
static bool try_sending_update (RaftableUpdate * ru , size_t size , int timeout_ms )
303
306
{
304
- int s , status ;
307
+ int s , status , remaining_ms ;
305
308
TimestampTz start , now ;
306
309
307
310
now = start = GetCurrentTimestamp ();
@@ -310,45 +313,49 @@ static bool try_sending_update(RaftableUpdate *ru, size_t size, int timeout_ms)
310
313
if (s < 0 ) return false;
311
314
312
315
now = GetCurrentTimestamp ();
316
+ remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
313
317
if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
314
318
{
315
319
elog (WARNING , "update: connect() timed out" );
316
320
return false;
317
321
}
318
322
319
- if (!timed_write (s , & size , sizeof (size ), timeout_ms - msec ( now - start ) ))
323
+ if (!timed_write (s , & size , sizeof (size ), remaining_ms ))
320
324
{
321
325
elog (WARNING , "failed to send the update size to the leader" );
322
326
return false;
323
327
}
324
328
325
329
now = GetCurrentTimestamp ();
330
+ remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
326
331
if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
327
332
{
328
333
elog (WARNING , "update: send(size) timed out" );
329
334
return false;
330
335
}
331
336
332
- if (!timed_write (s , ru , size , timeout_ms - msec ( now - start ) ))
337
+ if (!timed_write (s , ru , size , remaining_ms ))
333
338
{
334
339
elog (WARNING , "failed to send the update to the leader" );
335
340
return false;
336
341
}
337
342
338
343
now = GetCurrentTimestamp ();
344
+ remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
339
345
if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
340
346
{
341
347
elog (WARNING , "update: send(body) timed out" );
342
348
return false;
343
349
}
344
350
345
- if (!timed_read (s , & status , sizeof (status ), timeout_ms - msec ( now - start ) ))
351
+ if (!timed_read (s , & status , sizeof (status ), remaining_ms ))
346
352
{
347
353
elog (WARNING , "failed to recv the update status from the leader" );
348
354
return false;
349
355
}
350
356
351
357
now = GetCurrentTimestamp ();
358
+ remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
352
359
if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
353
360
{
354
361
elog (WARNING , "update: recv(status) timed out" );
@@ -396,7 +403,8 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
396
403
while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
397
404
{
398
405
TimestampTz past = now ;
399
- if (try_sending_update (ru , size , timeout_ms - elapsed_ms ))
406
+ int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - elapsed_ms ;
407
+ if (try_sending_update (ru , size , remaining_ms ))
400
408
{
401
409
pfree (ru );
402
410
return true;
0 commit comments