Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit fb03d08

Browse files
committed
Rationalize parallel dump/restore's handling of worker cmd/status messages.
The existing APIs for creating and parsing command and status messages are rather messy; for example, archive-format modules have to provide code for constructing command messages, which is entirely pointless since the code to read them is hard-wired in WaitForCommands() and hence no format-specific variation is actually possible. But there's little foreseeable reason to need format-specific variation anyway. The situation for status messages is no better; at least those are both constructed and parsed by format-specific code, but said code is quite redundant since there's no actual need for format-specific variation. To add insult to injury, the first API involves returning pointers to static buffers, which is bad, while the second involves returning pointers to malloc'd strings, which is safer but randomly inconsistent. Hence, get rid of the MasterStartParallelItem and MasterEndParallelItem APIs, and instead write centralized functions that construct and parse command and status messages. If we ever do need more flexibility, these functions can be the standard implementations of format-specific callback methods, but that's a long way off if it ever happens. Tom Lane, reviewed by Kevin Grittner Discussion: <17340.1464465717@sss.pgh.pa.us>
1 parent b7b8cc0 commit fb03d08

File tree

5 files changed

+162
-274
lines changed

5 files changed

+162
-274
lines changed

src/bin/pg_dump/parallel.c

Lines changed: 145 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -20,32 +20,25 @@
2020
* the desired number of worker processes, which each enter WaitForCommands().
2121
*
2222
* The master process dispatches an individual work item to one of the worker
23-
* processes in DispatchJobForTocEntry(). That calls
24-
* AH->MasterStartParallelItemPtr, a routine of the output format. This
25-
* function's arguments are the parents archive handle AH (containing the full
26-
* catalog information), the TocEntry that the worker should work on and a
27-
* T_Action value indicating whether this is a backup or a restore task. The
28-
* function simply converts the TocEntry assignment into a command string that
29-
* is then sent over to the worker process. In the simplest case that would be
30-
* something like "DUMP 1234", with 1234 being the TocEntry id.
31-
*
23+
* processes in DispatchJobForTocEntry(). We send a command string such as
24+
* "DUMP 1234" or "RESTORE 1234", where 1234 is the TocEntry ID.
3225
* The worker process receives and decodes the command and passes it to the
3326
* routine pointed to by AH->WorkerJobDumpPtr or AH->WorkerJobRestorePtr,
3427
* which are routines of the current archive format. That routine performs
35-
* the required action (dump or restore) and returns a malloc'd status string.
36-
* The status string is passed back to the master where it is interpreted by
37-
* AH->MasterEndParallelItemPtr, another format-specific routine. That
38-
* function can update format-specific information on the master's side,
39-
* depending on the reply from the worker process. In the end it returns a
40-
* status code, which we pass to the ParallelCompletionPtr callback function
41-
* that was passed to DispatchJobForTocEntry(). The callback function does
42-
* state updating for the master control logic in pg_backup_archiver.c.
28+
* the required action (dump or restore) and returns an integer status code.
29+
* This is passed back to the master where we pass it to the
30+
* ParallelCompletionPtr callback function that was passed to
31+
* DispatchJobForTocEntry(). The callback function does state updating
32+
* for the master control logic in pg_backup_archiver.c.
4333
*
44-
* Remember that we have forked off the workers only after we have read in
45-
* the catalog. That's why our worker processes can also access the catalog
46-
* information. (In the Windows case, the workers are threads in the same
47-
* process. To avoid problems, they work with cloned copies of the Archive
48-
* data structure; see RunWorker().)
34+
* In principle additional archive-format-specific information might be needed
35+
* in commands or worker status responses, but so far that hasn't proved
36+
* necessary, since workers have full copies of the ArchiveHandle/TocEntry
37+
* data structures. Remember that we have forked off the workers only after
38+
* we have read in the catalog. That's why our worker processes can also
39+
* access the catalog information. (In the Windows case, the workers are
40+
* threads in the same process. To avoid problems, they work with cloned
41+
* copies of the Archive data structure; see RunWorker().)
4942
*
5043
* In the master process, the workerStatus field for each worker has one of
5144
* the following values:
@@ -1073,6 +1066,110 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate)
10731066
free(pstate);
10741067
}
10751068

1069+
/*
1070+
* These next four functions handle construction and parsing of the command
1071+
* strings and response strings for parallel workers.
1072+
*
1073+
* Currently, these can be the same regardless of which archive format we are
1074+
* processing. In future, we might want to let format modules override these
1075+
* functions to add format-specific data to a command or response.
1076+
*/
1077+
1078+
/*
1079+
* buildWorkerCommand: format a command string to send to a worker.
1080+
*
1081+
* The string is built in the caller-supplied buffer of size buflen.
1082+
*/
1083+
static void
1084+
buildWorkerCommand(ArchiveHandle *AH, TocEntry *te, T_Action act,
1085+
char *buf, int buflen)
1086+
{
1087+
if (act == ACT_DUMP)
1088+
snprintf(buf, buflen, "DUMP %d", te->dumpId);
1089+
else if (act == ACT_RESTORE)
1090+
snprintf(buf, buflen, "RESTORE %d", te->dumpId);
1091+
else
1092+
Assert(false);
1093+
}
1094+
1095+
/*
1096+
* parseWorkerCommand: interpret a command string in a worker.
1097+
*/
1098+
static void
1099+
parseWorkerCommand(ArchiveHandle *AH, TocEntry **te, T_Action *act,
1100+
const char *msg)
1101+
{
1102+
DumpId dumpId;
1103+
int nBytes;
1104+
1105+
if (messageStartsWith(msg, "DUMP "))
1106+
{
1107+
*act = ACT_DUMP;
1108+
sscanf(msg, "DUMP %d%n", &dumpId, &nBytes);
1109+
Assert(nBytes == strlen(msg));
1110+
*te = getTocEntryByDumpId(AH, dumpId);
1111+
Assert(*te != NULL);
1112+
}
1113+
else if (messageStartsWith(msg, "RESTORE "))
1114+
{
1115+
*act = ACT_RESTORE;
1116+
sscanf(msg, "RESTORE %d%n", &dumpId, &nBytes);
1117+
Assert(nBytes == strlen(msg));
1118+
*te = getTocEntryByDumpId(AH, dumpId);
1119+
Assert(*te != NULL);
1120+
}
1121+
else
1122+
exit_horribly(modulename,
1123+
"unrecognized command received from master: \"%s\"\n",
1124+
msg);
1125+
}
1126+
1127+
/*
1128+
* buildWorkerResponse: format a response string to send to the master.
1129+
*
1130+
* The string is built in the caller-supplied buffer of size buflen.
1131+
*/
1132+
static void
1133+
buildWorkerResponse(ArchiveHandle *AH, TocEntry *te, T_Action act, int status,
1134+
char *buf, int buflen)
1135+
{
1136+
snprintf(buf, buflen, "OK %d %d %d",
1137+
te->dumpId,
1138+
status,
1139+
status == WORKER_IGNORED_ERRORS ? AH->public.n_errors : 0);
1140+
}
1141+
1142+
/*
1143+
* parseWorkerResponse: parse the status message returned by a worker.
1144+
*
1145+
* Returns the integer status code, and may update fields of AH and/or te.
1146+
*/
1147+
static int
1148+
parseWorkerResponse(ArchiveHandle *AH, TocEntry *te,
1149+
const char *msg)
1150+
{
1151+
DumpId dumpId;
1152+
int nBytes,
1153+
n_errors;
1154+
int status = 0;
1155+
1156+
if (messageStartsWith(msg, "OK "))
1157+
{
1158+
sscanf(msg, "OK %d %d %d%n", &dumpId, &status, &n_errors, &nBytes);
1159+
1160+
Assert(dumpId == te->dumpId);
1161+
Assert(nBytes == strlen(msg));
1162+
1163+
AH->public.n_errors += n_errors;
1164+
}
1165+
else
1166+
exit_horribly(modulename,
1167+
"invalid message received from worker: \"%s\"\n",
1168+
msg);
1169+
1170+
return status;
1171+
}
1172+
10761173
/*
10771174
* Dispatch a job to some free worker.
10781175
*
@@ -1091,18 +1188,16 @@ DispatchJobForTocEntry(ArchiveHandle *AH,
10911188
void *callback_data)
10921189
{
10931190
int worker;
1094-
char *arg;
1191+
char buf[256];
10951192

10961193
/* Get a worker, waiting if none are idle */
10971194
while ((worker = GetIdleWorker(pstate)) == NO_SLOT)
10981195
WaitForWorkers(AH, pstate, WFW_ONE_IDLE);
10991196

11001197
/* Construct and send command string */
1101-
arg = (AH->MasterStartParallelItemPtr) (AH, te, act);
1102-
1103-
sendMessageToWorker(pstate, worker, arg);
1198+
buildWorkerCommand(AH, te, act, buf, sizeof(buf));
11041199

1105-
/* XXX aren't we leaking string here? (no, because it's static. Ick.) */
1200+
sendMessageToWorker(pstate, worker, buf);
11061201

11071202
/* Remember worker is busy, and which TocEntry it's working on */
11081203
pstate->parallelSlot[worker].workerStatus = WRKR_WORKING;
@@ -1220,10 +1315,10 @@ static void
12201315
WaitForCommands(ArchiveHandle *AH, int pipefd[2])
12211316
{
12221317
char *command;
1223-
DumpId dumpId;
1224-
int nBytes;
1225-
char *str;
12261318
TocEntry *te;
1319+
T_Action act;
1320+
int status = 0;
1321+
char buf[256];
12271322

12281323
for (;;)
12291324
{
@@ -1233,47 +1328,29 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
12331328
return;
12341329
}
12351330

1236-
if (messageStartsWith(command, "DUMP "))
1237-
{
1238-
/* Decode the command */
1239-
sscanf(command + strlen("DUMP "), "%d%n", &dumpId, &nBytes);
1240-
Assert(nBytes == strlen(command) - strlen("DUMP "));
1241-
te = getTocEntryByDumpId(AH, dumpId);
1242-
Assert(te != NULL);
1331+
/* Decode the command */
1332+
parseWorkerCommand(AH, &te, &act, command);
12431333

1334+
if (act == ACT_DUMP)
1335+
{
12441336
/* Acquire lock on this table within the worker's session */
12451337
lockTableForWorker(AH, te);
12461338

12471339
/* Perform the dump command */
1248-
str = (AH->WorkerJobDumpPtr) (AH, te);
1249-
1250-
/* Return status to master */
1251-
sendMessageToMaster(pipefd, str);
1252-
1253-
/* we are responsible for freeing the status string */
1254-
free(str);
1340+
status = (AH->WorkerJobDumpPtr) (AH, te);
12551341
}
1256-
else if (messageStartsWith(command, "RESTORE "))
1342+
else if (act == ACT_RESTORE)
12571343
{
1258-
/* Decode the command */
1259-
sscanf(command + strlen("RESTORE "), "%d%n", &dumpId, &nBytes);
1260-
Assert(nBytes == strlen(command) - strlen("RESTORE "));
1261-
te = getTocEntryByDumpId(AH, dumpId);
1262-
Assert(te != NULL);
1263-
12641344
/* Perform the restore command */
1265-
str = (AH->WorkerJobRestorePtr) (AH, te);
1266-
1267-
/* Return status to master */
1268-
sendMessageToMaster(pipefd, str);
1269-
1270-
/* we are responsible for freeing the status string */
1271-
free(str);
1345+
status = (AH->WorkerJobRestorePtr) (AH, te);
12721346
}
12731347
else
1274-
exit_horribly(modulename,
1275-
"unrecognized command received from master: \"%s\"\n",
1276-
command);
1348+
Assert(false);
1349+
1350+
/* Return status to master */
1351+
buildWorkerResponse(AH, te, act, status, buf, sizeof(buf));
1352+
1353+
sendMessageToMaster(pipefd, buf);
12771354

12781355
/* command was pg_malloc'd and we are responsible for free()ing it. */
12791356
free(command);
@@ -1286,9 +1363,9 @@ WaitForCommands(ArchiveHandle *AH, int pipefd[2])
12861363
* If do_wait is true, wait to get a status message; otherwise, just return
12871364
* immediately if there is none available.
12881365
*
1289-
* When we get a status message, we let MasterEndParallelItemPtr process it,
1290-
* then pass the resulting status code to the callback function that was
1291-
* specified to DispatchJobForTocEntry, then reset the worker status to IDLE.
1366+
* When we get a status message, we pass the status code to the callback
1367+
* function that was specified to DispatchJobForTocEntry, then reset the
1368+
* worker status to IDLE.
12921369
*
12931370
* Returns true if we collected a status message, else false.
12941371
*
@@ -1318,29 +1395,10 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
13181395
{
13191396
ParallelSlot *slot = &pstate->parallelSlot[worker];
13201397
TocEntry *te = slot->te;
1321-
char *statusString;
13221398
int status;
13231399

1324-
if (messageStartsWith(msg, "OK RESTORE "))
1325-
{
1326-
statusString = msg + strlen("OK RESTORE ");
1327-
status =
1328-
(AH->MasterEndParallelItemPtr)
1329-
(AH, te, statusString, ACT_RESTORE);
1330-
slot->callback(AH, te, status, slot->callback_data);
1331-
}
1332-
else if (messageStartsWith(msg, "OK DUMP "))
1333-
{
1334-
statusString = msg + strlen("OK DUMP ");
1335-
status =
1336-
(AH->MasterEndParallelItemPtr)
1337-
(AH, te, statusString, ACT_DUMP);
1338-
slot->callback(AH, te, status, slot->callback_data);
1339-
}
1340-
else
1341-
exit_horribly(modulename,
1342-
"invalid message received from worker: \"%s\"\n",
1343-
msg);
1400+
status = parseWorkerResponse(AH, te, msg);
1401+
slot->callback(AH, te, status, slot->callback_data);
13441402
slot->workerStatus = WRKR_IDLE;
13451403
slot->te = NULL;
13461404
}
@@ -1364,8 +1422,8 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
13641422
* WFW_ONE_IDLE: wait for at least one worker to be idle
13651423
* WFW_ALL_IDLE: wait for all workers to be idle
13661424
*
1367-
* Any received results are passed to MasterEndParallelItemPtr and then
1368-
* to the callback specified to DispatchJobForTocEntry.
1425+
* Any received results are passed to the callback specified to
1426+
* DispatchJobForTocEntry.
13691427
*
13701428
* This function is executed in the master process.
13711429
*/

src/bin/pg_dump/pg_backup_archiver.h

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,12 +161,8 @@ typedef void (*PrintTocDataPtr) (ArchiveHandle *AH, TocEntry *te);
161161
typedef void (*ClonePtr) (ArchiveHandle *AH);
162162
typedef void (*DeClonePtr) (ArchiveHandle *AH);
163163

164-
typedef char *(*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te);
165-
typedef char *(*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te);
166-
typedef char *(*MasterStartParallelItemPtr) (ArchiveHandle *AH, TocEntry *te,
167-
T_Action act);
168-
typedef int (*MasterEndParallelItemPtr) (ArchiveHandle *AH, TocEntry *te,
169-
const char *str, T_Action act);
164+
typedef int (*WorkerJobDumpPtr) (ArchiveHandle *AH, TocEntry *te);
165+
typedef int (*WorkerJobRestorePtr) (ArchiveHandle *AH, TocEntry *te);
170166

171167
typedef size_t (*CustomOutPtr) (ArchiveHandle *AH, const void *buf, size_t len);
172168

@@ -266,9 +262,6 @@ struct _archiveHandle
266262
StartBlobPtr StartBlobPtr;
267263
EndBlobPtr EndBlobPtr;
268264

269-
MasterStartParallelItemPtr MasterStartParallelItemPtr;
270-
MasterEndParallelItemPtr MasterEndParallelItemPtr;
271-
272265
SetupWorkerPtr SetupWorkerPtr;
273266
WorkerJobDumpPtr WorkerJobDumpPtr;
274267
WorkerJobRestorePtr WorkerJobRestorePtr;

0 commit comments

Comments
 (0)