* WRKR_IDLE: it's waiting for a command
* WRKR_WORKING: it's been sent a command
* WRKR_FINISHED: it's returned a result
- * WRKR_TERMINATED: process ended
+ * WRKR_TERMINATED: process ended (or not started yet)
* The FINISHED state indicates that the worker is idle, but we've not yet
* dealt with the status code it returned from the prior command.
* ReapWorkerStatus() extracts the unhandled command status value and sets
/*
* Close our write end of the sockets so that any workers waiting for
- * commands know they can exit.
+ * commands know they can exit. (Note: some of the pipeWrite fields might
+ * still be zero, if we failed to initialize all the workers. Hence, just
+ * ignore errors here.)
*/
for (i = 0; i < pstate->numWorkers; i++)
closesocket(pstate->parallelSlot[i].pipeWrite);
for (j = 0; j < pstate->numWorkers; j++)
{
- if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+ if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
{
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
nrun++;
if (AH->public.numWorkers == 1)
return pstate;
+ /* Create status array, being sure to initialize all fields to 0 */
pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize);
memset((void *) pstate->parallelSlot, 0, slotSize);
int pipeMW[2],
pipeWM[2];
+ slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
+ slot->args->AH = NULL;
+ slot->args->te = NULL;
+
/* Create communication pipes for this worker */
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
exit_horribly(modulename,
"could not create communication channels: %s\n",
strerror(errno));
- slot->workerStatus = WRKR_IDLE;
- slot->args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
- slot->args->AH = NULL;
- slot->args->te = NULL;
-
/* master's ends of the pipes */
slot->pipeRead = pipeWM[PIPE_READ];
slot->pipeWrite = pipeMW[PIPE_WRITE];
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(slot->threadId));
slot->hThread = handle;
+ slot->workerStatus = WRKR_IDLE;
#else /* !WIN32 */
pid = fork();
if (pid == 0)
/* In Master after successful fork */
slot->pid = pid;
+ slot->workerStatus = WRKR_IDLE;
/* close read end of Master -> Worker */
closesocket(pipeMW[PIPE_READ]);
}
/*
- * Return true iff every worker is in the WRKR_TERMINATED state.
+ * Return true iff no worker is running.
*/
static bool
HasEveryWorkerTerminated(ParallelState *pstate)
for (i = 0; i < pstate->numWorkers; i++)
{
- if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
return false;
}
return true;
FD_ZERO(&workerset);
for (i = 0; i < pstate->numWorkers; i++)
{
- if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+ if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
continue;
FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
if (pstate->parallelSlot[i].pipeRead > maxFd)
{
char *msg;
+ if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
+ continue;
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
continue;