Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a5fe473

Browse files
committed
Minor cleanup for access/transam/parallel.c.
ParallelMessagePending *must* be marked volatile, because it's set by a signal handler. On the other hand, it's pointless for HandleParallelMessageInterrupt to save/restore errno; that must be, and is, done at the outer level of the SIGUSR1 signal handler. Calling CHECK_FOR_INTERRUPTS() inside HandleParallelMessages, which itself is called from CHECK_FOR_INTERRUPTS(), seems both useless and hazardous. The comment claiming that this is needed to handle the error queue going away is certainly misguided, in any case. Improve a couple of error message texts, and use ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE to report loss of parallel worker connection, since that's what's used in e.g. tqueue.c. (Maybe it would be worth inventing a dedicated ERRCODE for this type of failure? But I do not think ERRCODE_INTERNAL_ERROR is appropriate.) Minor stylistic cleanups.
1 parent 887feef commit a5fe473

File tree

2 files changed

+18
-20
lines changed

2 files changed

+18
-20
lines changed

src/backend/access/transam/parallel.c

+12-13
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
#include "postgres.h"
1616

17+
#include "access/parallel.h"
1718
#include "access/xact.h"
1819
#include "access/xlog.h"
19-
#include "access/parallel.h"
2020
#include "catalog/namespace.h"
2121
#include "commands/async.h"
2222
#include "libpq/libpq.h"
@@ -35,6 +35,7 @@
3535
#include "utils/resowner.h"
3636
#include "utils/snapmgr.h"
3737

38+
3839
/*
3940
* We don't want to waste a lot of memory on an error queue which, most of
4041
* the time, will process only a handful of small messages. However, it is
@@ -94,7 +95,7 @@ typedef struct FixedParallelState
9495
int ParallelWorkerNumber = -1;
9596

9697
/* Is there a parallel message pending which we need to receive? */
97-
bool ParallelMessagePending = false;
98+
volatile bool ParallelMessagePending = false;
9899

99100
/* Are we initializing a parallel worker? */
100101
bool InitializingParallelWorker = false;
@@ -106,12 +107,13 @@ static FixedParallelState *MyFixedParallelState;
106107
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
107108

108109
/* Private functions. */
109-
static void HandleParallelMessage(ParallelContext *, int, StringInfo msg);
110+
static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
110111
static void ParallelErrorContext(void *arg);
111112
static void ParallelExtensionTrampoline(dsm_segment *seg, shm_toc *toc);
112113
static void ParallelWorkerMain(Datum main_arg);
113114
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
114115

116+
115117
/*
116118
* Establish a new parallel context. This should be done after entering
117119
* parallel mode, and (unless there is an error) the context should be
@@ -681,17 +683,17 @@ ParallelContextActive(void)
681683

682684
/*
683685
* Handle receipt of an interrupt indicating a parallel worker message.
686+
*
687+
* Note: this is called within a signal handler! All we can do is set
688+
* a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
689+
* HandleParallelMessages().
684690
*/
685691
void
686692
HandleParallelMessageInterrupt(void)
687693
{
688-
int save_errno = errno;
689-
690694
InterruptPending = true;
691695
ParallelMessagePending = true;
692696
SetLatch(MyLatch);
693-
694-
errno = save_errno;
695697
}
696698

697699
/*
@@ -742,11 +744,8 @@ HandleParallelMessages(void)
742744
}
743745
else
744746
ereport(ERROR,
745-
(errcode(ERRCODE_INTERNAL_ERROR), /* XXX: wrong errcode? */
746-
errmsg("lost connection to parallel worker")));
747-
748-
/* This might make the error queue go away. */
749-
CHECK_FOR_INTERRUPTS();
747+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
748+
errmsg("lost connection to parallel worker")));
750749
}
751750
}
752751
}
@@ -833,7 +832,7 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
833832

834833
default:
835834
{
836-
elog(ERROR, "unknown message type: %c (%d bytes)",
835+
elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
837836
msgtype, msg->len);
838837
}
839838
}

src/include/access/parallel.h

+6-7
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include "postmaster/bgworker.h"
2020
#include "storage/shm_mq.h"
2121
#include "storage/shm_toc.h"
22-
#include "utils/elog.h"
2322

2423
typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
2524

@@ -47,25 +46,25 @@ typedef struct ParallelContext
4746
ParallelWorkerInfo *worker;
4847
} ParallelContext;
4948

50-
extern bool ParallelMessagePending;
49+
extern volatile bool ParallelMessagePending;
5150
extern int ParallelWorkerNumber;
5251
extern bool InitializingParallelWorker;
5352

5453
#define IsParallelWorker() (ParallelWorkerNumber >= 0)
5554

5655
extern ParallelContext *CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers);
5756
extern ParallelContext *CreateParallelContextForExternalFunction(char *library_name, char *function_name, int nworkers);
58-
extern void InitializeParallelDSM(ParallelContext *);
57+
extern void InitializeParallelDSM(ParallelContext *pcxt);
5958
extern void ReinitializeParallelDSM(ParallelContext *pcxt);
60-
extern void LaunchParallelWorkers(ParallelContext *);
61-
extern void WaitForParallelWorkersToFinish(ParallelContext *);
62-
extern void DestroyParallelContext(ParallelContext *);
59+
extern void LaunchParallelWorkers(ParallelContext *pcxt);
60+
extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
61+
extern void DestroyParallelContext(ParallelContext *pcxt);
6362
extern bool ParallelContextActive(void);
6463

6564
extern void HandleParallelMessageInterrupt(void);
6665
extern void HandleParallelMessages(void);
6766
extern void AtEOXact_Parallel(bool isCommit);
6867
extern void AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId);
69-
extern void ParallelWorkerReportLastRecEnd(XLogRecPtr);
68+
extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end);
7069

7170
#endif /* PARALLEL_H */

0 commit comments

Comments
 (0)