38
38
#define STREAMING_HEADER_SIZE (1+sizeof(WalDataMessageHeader))
39
39
#define STREAMING_KEEPALIVE_SIZE (1+sizeof(PrimaryKeepaliveMessage))
40
40
41
+ /* fd for currently open WAL file */
42
+ static int walfile = -1 ;
43
+
44
+
41
45
/*
42
46
* Open a new WAL file in the specified directory. Store the name
43
47
* (not including the full directory) in namebuf. Assumes there is
@@ -96,6 +100,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
96
100
{
97
101
fprintf (stderr , _ ("%s: could not pad WAL segment %s: %s\n" ),
98
102
progname , fn , strerror (errno ));
103
+ free (zerobuf );
99
104
close (f );
100
105
unlink (fn );
101
106
return -1 ;
@@ -120,7 +125,7 @@ open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebu
120
125
* completed writing the whole segment.
121
126
*/
122
127
static bool
123
- close_walfile (int walfile , char * basedir , char * walname , bool segment_complete )
128
+ close_walfile (char * basedir , char * walname , bool segment_complete )
124
129
{
125
130
off_t currpos = lseek (walfile , 0 , SEEK_CUR );
126
131
@@ -142,8 +147,10 @@ close_walfile(int walfile, char *basedir, char *walname, bool segment_complete)
142
147
{
143
148
fprintf (stderr , _ ("%s: could not close file %s: %s\n" ),
144
149
progname , walname , strerror (errno ));
150
+ walfile = -1 ;
145
151
return false;
146
152
}
153
+ walfile = -1 ;
147
154
148
155
/*
149
156
* Rename the .partial file only if we've completed writing the whole
@@ -270,7 +277,6 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
270
277
char current_walfile_name [MAXPGPATH ];
271
278
PGresult * res ;
272
279
char * copybuf = NULL ;
273
- int walfile = -1 ;
274
280
int64 last_status = -1 ;
275
281
XLogRecPtr blockpos = InvalidXLogRecPtr ;
276
282
@@ -315,6 +321,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
315
321
{
316
322
fprintf (stderr , _ ("%s: could not start replication: %s\n" ),
317
323
progname , PQresultErrorMessage (res ));
324
+ PQclear (res );
318
325
return false;
319
326
}
320
327
PQclear (res );
@@ -341,9 +348,9 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
341
348
*/
342
349
if (stream_stop && stream_stop (blockpos , timeline , false))
343
350
{
344
- if (walfile != -1 )
351
+ if (walfile != -1 && ! close_walfile ( basedir , current_walfile_name , rename_partial ) )
345
352
/* Potential error message is written by close_walfile */
346
- return close_walfile ( walfile , basedir , current_walfile_name , rename_partial ) ;
353
+ goto error ;
347
354
return true;
348
355
}
349
356
@@ -370,7 +377,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
370
377
{
371
378
fprintf (stderr , _ ("%s: could not send feedback packet: %s" ),
372
379
progname , PQerrorMessage (conn ));
373
- return false ;
380
+ goto error ;
374
381
}
375
382
376
383
last_status = now ;
@@ -421,14 +428,14 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
421
428
{
422
429
fprintf (stderr , _ ("%s: select() failed: %s\n" ),
423
430
progname , strerror (errno ));
424
- return false ;
431
+ goto error ;
425
432
}
426
433
/* Else there is actually data on the socket */
427
434
if (PQconsumeInput (conn ) == 0 )
428
435
{
429
436
fprintf (stderr , _ ("%s: could not receive data from WAL stream: %s\n" ),
430
437
progname , PQerrorMessage (conn ));
431
- return false ;
438
+ goto error ;
432
439
}
433
440
continue ;
434
441
}
@@ -439,7 +446,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
439
446
{
440
447
fprintf (stderr , _ ("%s: could not read copy data: %s\n" ),
441
448
progname , PQerrorMessage (conn ));
442
- return false ;
449
+ goto error ;
443
450
}
444
451
if (copybuf [0 ] == 'k' )
445
452
{
@@ -451,21 +458,21 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
451
458
{
452
459
fprintf (stderr , _ ("%s: keepalive message is incorrect size: %d\n" ),
453
460
progname , r );
454
- return false ;
461
+ goto error ;
455
462
}
456
463
continue ;
457
464
}
458
465
else if (copybuf [0 ] != 'w' )
459
466
{
460
467
fprintf (stderr , _ ("%s: unrecognized streaming header: \"%c\"\n" ),
461
468
progname , copybuf [0 ]);
462
- return false ;
469
+ goto error ;
463
470
}
464
471
if (r < STREAMING_HEADER_SIZE + 1 )
465
472
{
466
473
fprintf (stderr , _ ("%s: streaming header too small: %d\n" ),
467
474
progname , r );
468
- return false ;
475
+ goto error ;
469
476
}
470
477
471
478
/* Extract WAL location for this block */
@@ -483,7 +490,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
483
490
{
484
491
fprintf (stderr , _ ("%s: received xlog record for offset %u with no file open\n" ),
485
492
progname , xlogoff );
486
- return false ;
493
+ goto error ;
487
494
}
488
495
}
489
496
else
@@ -494,7 +501,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
494
501
{
495
502
fprintf (stderr , _ ("%s: got WAL data offset %08x, expected %08x\n" ),
496
503
progname , xlogoff , (int ) lseek (walfile , 0 , SEEK_CUR ));
497
- return false ;
504
+ goto error ;
498
505
}
499
506
}
500
507
@@ -520,7 +527,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
520
527
basedir , current_walfile_name );
521
528
if (walfile == -1 )
522
529
/* Error logged by open_walfile */
523
- return false ;
530
+ goto error ;
524
531
}
525
532
526
533
if (write (walfile ,
@@ -532,7 +539,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
532
539
bytes_to_write ,
533
540
current_walfile_name ,
534
541
strerror (errno ));
535
- return false ;
542
+ goto error ;
536
543
}
537
544
538
545
/* Write was successful, advance our position */
@@ -544,11 +551,10 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
544
551
/* Did we reach the end of a WAL segment? */
545
552
if (blockpos % XLOG_SEG_SIZE == 0 )
546
553
{
547
- if (!close_walfile (walfile , basedir , current_walfile_name , false))
554
+ if (!close_walfile (basedir , current_walfile_name , false))
548
555
/* Error message written in close_walfile() */
549
- return false ;
556
+ goto error ;
550
557
551
- walfile = -1 ;
552
558
xlogoff = 0 ;
553
559
554
560
if (stream_stop != NULL )
@@ -577,8 +583,22 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysi
577
583
{
578
584
fprintf (stderr , _ ("%s: unexpected termination of replication stream: %s\n" ),
579
585
progname , PQresultErrorMessage (res ));
580
- return false ;
586
+ goto error ;
581
587
}
582
588
PQclear (res );
589
+
590
+ if (copybuf != NULL )
591
+ PQfreemem (copybuf );
592
+ if (walfile != -1 && close (walfile ) != 0 )
593
+ fprintf (stderr , _ ("%s: could not close file %s: %s\n" ),
594
+ progname , current_walfile_name , strerror (errno ));
583
595
return true;
596
+
597
+ error :
598
+ if (copybuf != NULL )
599
+ PQfreemem (copybuf );
600
+ if (walfile != -1 && close (walfile ) != 0 )
601
+ fprintf (stderr , _ ("%s: could not close file %s: %s\n" ),
602
+ progname , current_walfile_name , strerror (errno ));
603
+ return false;
584
604
}
0 commit comments