Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit bd2cb9a

Browse files
committed
Implement a chunking protocol for writes to the syslogger pipe, with messages
reassembled in the syslogger before writing to the log file. This prevents partial messages from being written, which mucks up log rotation, and messages from different backends being interleaved, which causes garbled logs. Backport as far as 8.0, where the syslogger was introduced. Tom Lane and Andrew Dunstan
1 parent 320f820 commit bd2cb9a

File tree

3 files changed

+345
-16
lines changed

3 files changed

+345
-16
lines changed

src/backend/postmaster/syslogger.c

Lines changed: 255 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
*
1919
*
2020
* IDENTIFICATION
21-
* $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.31 2007/06/04 22:21:42 adunstan Exp $
21+
* $PostgreSQL: pgsql/src/backend/postmaster/syslogger.c,v 1.32 2007/06/14 01:48:51 adunstan Exp $
2222
*
2323
*-------------------------------------------------------------------------
2424
*/
@@ -31,6 +31,7 @@
3131
#include <sys/stat.h>
3232
#include <sys/time.h>
3333

34+
#include "lib/stringinfo.h"
3435
#include "libpq/pqsignal.h"
3536
#include "miscadmin.h"
3637
#include "pgtime.h"
@@ -54,6 +55,13 @@
5455
#define LBF_MODE _IOLBF
5556
#endif
5657

58+
/*
59+
* We read() into a temp buffer twice as big as a chunk, so that any fragment
60+
* left after processing can be moved down to the front and we'll still have
61+
* room to read a full chunk.
62+
*/
63+
#define READ_BUF_SIZE (2 * PIPE_CHUNK_SIZE)
64+
5765

5866
/*
5967
* GUC parameters. Redirect_stderr cannot be changed after postmaster
@@ -75,15 +83,28 @@ bool am_syslogger = false;
7583
* Private state
7684
*/
7785
static pg_time_t next_rotation_time;
78-
7986
static bool redirection_done = false;
80-
8187
static bool pipe_eof_seen = false;
82-
8388
static FILE *syslogFile = NULL;
84-
8589
static char *last_file_name = NULL;
8690

91+
/*
92+
* Buffers for saving partial messages from different backends. We don't expect
93+
* that there will be very many outstanding at one time, so 20 seems plenty of
94+
* leeway. If this array gets full we won't lose messages, but we will lose
95+
* the protocol protection against them being partially written or interleaved.
96+
*
97+
* An inactive buffer has pid == 0 and undefined contents of data.
98+
*/
99+
typedef struct
100+
{
101+
int32 pid; /* PID of source process */
102+
StringInfoData data; /* accumulated data, as a StringInfo */
103+
} save_buffer;
104+
105+
#define CHUNK_SLOTS 20
106+
static save_buffer saved_chunks[CHUNK_SLOTS];
107+
87108
/* These must be exported for EXEC_BACKEND case ... annoying */
88109
#ifndef WIN32
89110
int syslogPipe[2] = {-1, -1};
@@ -108,6 +129,8 @@ static volatile sig_atomic_t rotation_requested = false;
108129
static pid_t syslogger_forkexec(void);
109130
static void syslogger_parseArgs(int argc, char *argv[]);
110131
#endif
132+
static void process_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
133+
static void flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer);
111134

112135
#ifdef WIN32
113136
static unsigned int __stdcall pipeThread(void *arg);
@@ -126,6 +149,10 @@ static void sigUsr1Handler(SIGNAL_ARGS);
126149
NON_EXEC_STATIC void
127150
SysLoggerMain(int argc, char *argv[])
128151
{
152+
#ifndef WIN32
153+
char logbuffer[READ_BUF_SIZE];
154+
int bytes_in_logbuffer = 0;
155+
#endif
129156
char *currentLogDir;
130157
char *currentLogFilename;
131158
int currentLogRotationAge;
@@ -244,7 +271,6 @@ SysLoggerMain(int argc, char *argv[])
244271
bool time_based_rotation = false;
245272

246273
#ifndef WIN32
247-
char logbuffer[1024];
248274
int bytesRead;
249275
int rc;
250276
fd_set rfds;
@@ -326,8 +352,8 @@ SysLoggerMain(int argc, char *argv[])
326352
else if (rc > 0 && FD_ISSET(syslogPipe[0], &rfds))
327353
{
328354
bytesRead = piperead(syslogPipe[0],
329-
logbuffer, sizeof(logbuffer));
330-
355+
logbuffer + bytes_in_logbuffer,
356+
sizeof(logbuffer) - bytes_in_logbuffer);
331357
if (bytesRead < 0)
332358
{
333359
if (errno != EINTR)
@@ -337,7 +363,8 @@ SysLoggerMain(int argc, char *argv[])
337363
}
338364
else if (bytesRead > 0)
339365
{
340-
write_syslogger_file(logbuffer, bytesRead);
366+
bytes_in_logbuffer += bytesRead;
367+
process_pipe_input(logbuffer, &bytes_in_logbuffer);
341368
continue;
342369
}
343370
else
@@ -349,6 +376,9 @@ SysLoggerMain(int argc, char *argv[])
349376
* and all backends are shut down, and we are done.
350377
*/
351378
pipe_eof_seen = true;
379+
380+
/* if there's any data left then force it out now */
381+
flush_pipe_input(logbuffer, &bytes_in_logbuffer);
352382
}
353383
}
354384
#else /* WIN32 */
@@ -611,6 +641,207 @@ syslogger_parseArgs(int argc, char *argv[])
611641
#endif /* EXEC_BACKEND */
612642

613643

644+
/* --------------------------------
645+
* pipe protocol handling
646+
* --------------------------------
647+
*/
648+
649+
/*
650+
* Process data received through the syslogger pipe.
651+
*
652+
* This routine interprets the log pipe protocol which sends log messages as
653+
* (hopefully atomic) chunks - such chunks are detected and reassembled here.
654+
*
655+
* The protocol has a header that starts with two nul bytes, then has a 16 bit
656+
* length, the pid of the sending process, and a flag to indicate if it is
657+
* the last chunk in a message. Incomplete chunks are saved until we read some
658+
* more, and non-final chunks are accumulated until we get the final chunk.
659+
*
660+
* All of this is to avoid 2 problems:
661+
* . partial messages being written to logfiles (messes rotation), and
662+
* . messages from different backends being interleaved (messages garbled).
663+
*
664+
* Any non-protocol messages are written out directly. These should only come
665+
* from non-PostgreSQL sources, however (e.g. third party libraries writing to
666+
* stderr).
667+
*
668+
* logbuffer is the data input buffer, and *bytes_in_logbuffer is the number
669+
* of bytes present. On exit, any not-yet-eaten data is left-justified in
670+
* logbuffer, and *bytes_in_logbuffer is updated.
671+
*/
672+
static void
673+
process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
674+
{
675+
char *cursor = logbuffer;
676+
int count = *bytes_in_logbuffer;
677+
678+
/* While we have enough for a header, process data... */
679+
while (count >= (int) sizeof(PipeProtoHeader))
680+
{
681+
PipeProtoHeader p;
682+
int chunklen;
683+
684+
/* Do we have a valid header? */
685+
memcpy(&p, cursor, sizeof(PipeProtoHeader));
686+
if (p.nuls[0] == '\0' && p.nuls[1] == '\0' &&
687+
p.len > 0 && p.len <= PIPE_MAX_PAYLOAD &&
688+
p.pid != 0 &&
689+
(p.is_last == 't' || p.is_last == 'f'))
690+
{
691+
chunklen = PIPE_HEADER_SIZE + p.len;
692+
693+
/* Fall out of loop if we don't have the whole chunk yet */
694+
if (count < chunklen)
695+
break;
696+
697+
if (p.is_last == 'f')
698+
{
699+
/*
700+
* Save a complete non-final chunk in the per-pid buffer
701+
* if possible - if not just write it out.
702+
*/
703+
int free_slot = -1, existing_slot = -1;
704+
int i;
705+
StringInfo str;
706+
707+
for (i = 0; i < CHUNK_SLOTS; i++)
708+
{
709+
if (saved_chunks[i].pid == p.pid)
710+
{
711+
existing_slot = i;
712+
break;
713+
}
714+
if (free_slot < 0 && saved_chunks[i].pid == 0)
715+
free_slot = i;
716+
}
717+
if (existing_slot >= 0)
718+
{
719+
str = &(saved_chunks[existing_slot].data);
720+
appendBinaryStringInfo(str,
721+
cursor + PIPE_HEADER_SIZE,
722+
p.len);
723+
}
724+
else if (free_slot >= 0)
725+
{
726+
saved_chunks[free_slot].pid = p.pid;
727+
str = &(saved_chunks[free_slot].data);
728+
initStringInfo(str);
729+
appendBinaryStringInfo(str,
730+
cursor + PIPE_HEADER_SIZE,
731+
p.len);
732+
}
733+
else
734+
{
735+
/*
736+
* If there is no free slot we'll just have to take our
737+
* chances and write out a partial message and hope that
738+
* it's not followed by something from another pid.
739+
*/
740+
write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
741+
}
742+
}
743+
else
744+
{
745+
/*
746+
* Final chunk --- add it to anything saved for that pid, and
747+
* either way write the whole thing out.
748+
*/
749+
int existing_slot = -1;
750+
int i;
751+
StringInfo str;
752+
753+
for (i = 0; i < CHUNK_SLOTS; i++)
754+
{
755+
if (saved_chunks[i].pid == p.pid)
756+
{
757+
existing_slot = i;
758+
break;
759+
}
760+
}
761+
if (existing_slot >= 0)
762+
{
763+
str = &(saved_chunks[existing_slot].data);
764+
appendBinaryStringInfo(str,
765+
cursor + PIPE_HEADER_SIZE,
766+
p.len);
767+
write_syslogger_file(str->data, str->len);
768+
saved_chunks[existing_slot].pid = 0;
769+
pfree(str->data);
770+
}
771+
else
772+
{
773+
/* The whole message was one chunk, evidently. */
774+
write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len);
775+
}
776+
}
777+
778+
/* Finished processing this chunk */
779+
cursor += chunklen;
780+
count -= chunklen;
781+
}
782+
else
783+
{
784+
/* Process non-protocol data */
785+
786+
/*
787+
* Look for the start of a protocol header. If found, dump data
788+
* up to there and repeat the loop. Otherwise, dump it all and
789+
* fall out of the loop. (Note: we want to dump it all if
790+
* at all possible, so as to avoid dividing non-protocol messages
791+
* across logfiles. We expect that in many scenarios, a
792+
* non-protocol message will arrive all in one read(), and we
793+
* want to respect the read() boundary if possible.)
794+
*/
795+
for (chunklen = 1; chunklen < count; chunklen++)
796+
{
797+
if (cursor[chunklen] == '\0')
798+
break;
799+
}
800+
write_syslogger_file(cursor, chunklen);
801+
cursor += chunklen;
802+
count -= chunklen;
803+
}
804+
}
805+
806+
/* We don't have a full chunk, so left-align what remains in the buffer */
807+
if (count > 0 && cursor != logbuffer)
808+
memmove(logbuffer, cursor, count);
809+
*bytes_in_logbuffer = count;
810+
}
811+
812+
/*
813+
* Force out any buffered data
814+
*
815+
* This is currently used only at syslogger shutdown, but could perhaps be
816+
* useful at other times, so it is careful to leave things in a clean state.
817+
*/
818+
static void
819+
flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
820+
{
821+
int i;
822+
StringInfo str;
823+
824+
/* Dump any incomplete protocol messages */
825+
for (i = 0; i < CHUNK_SLOTS; i++)
826+
{
827+
if (saved_chunks[i].pid != 0)
828+
{
829+
str = &(saved_chunks[i].data);
830+
write_syslogger_file(str->data, str->len);
831+
saved_chunks[i].pid = 0;
832+
pfree(str->data);
833+
}
834+
}
835+
/*
836+
* Force out any remaining pipe data as-is; we don't bother trying to
837+
* remove any protocol headers that may exist in it.
838+
*/
839+
if (*bytes_in_logbuffer > 0)
840+
write_syslogger_file(logbuffer, *bytes_in_logbuffer);
841+
*bytes_in_logbuffer = 0;
842+
}
843+
844+
614845
/* --------------------------------
615846
* logfile routines
616847
* --------------------------------
@@ -653,12 +884,16 @@ write_syslogger_file(const char *buffer, int count)
653884
static unsigned int __stdcall
654885
pipeThread(void *arg)
655886
{
656-
DWORD bytesRead;
657-
char logbuffer[1024];
887+
char logbuffer[READ_BUF_SIZE];
888+
int bytes_in_logbuffer = 0;
658889

659890
for (;;)
660891
{
661-
if (!ReadFile(syslogPipe[0], logbuffer, sizeof(logbuffer),
892+
DWORD bytesRead;
893+
894+
if (!ReadFile(syslogPipe[0],
895+
logbuffer + bytes_in_logbuffer,
896+
sizeof(logbuffer) - bytes_in_logbuffer,
662897
&bytesRead, 0))
663898
{
664899
DWORD error = GetLastError();
@@ -672,11 +907,18 @@ pipeThread(void *arg)
672907
errmsg("could not read from logger pipe: %m")));
673908
}
674909
else if (bytesRead > 0)
675-
write_syslogger_file(logbuffer, bytesRead);
910+
{
911+
bytes_in_logbuffer += bytesRead;
912+
process_pipe_input(logbuffer, &bytes_in_logbuffer);
913+
}
676914
}
677915

678916
/* We exit the above loop only upon detecting pipe EOF */
679917
pipe_eof_seen = true;
918+
919+
/* if there's any data left then force it out now */
920+
flush_pipe_input(logbuffer, &bytes_in_logbuffer);
921+
680922
_endthread();
681923
return 0;
682924
}

0 commit comments

Comments
 (0)