Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 2a8b40e

Browse files
author
Amit Kapila
committed
Simplify determining logical replication worker types.
We deduce a LogicalRepWorker's type from the values of several different fields ('relid' and 'leader_pid') whenever logic needs to know it. In fact, the logical replication worker type is already known at the time of launching the LogicalRepWorker and it never changes for the lifetime of that process. Instead of deducing the type, it is simpler to just store it one time, and access it directly thereafter. Author: Peter Smith Reviewed-by: Amit Kapila, Bharath Rupireddy Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
1 parent 3d8d217 commit 2a8b40e

File tree

5 files changed

+43
-17
lines changed

5 files changed

+43
-17
lines changed

src/backend/replication/logical/applyparallelworker.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
435435
return NULL;
436436
}
437437

438-
launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
438+
launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
439+
MyLogicalRepWorker->dbid,
439440
MySubscription->oid,
440441
MySubscription->name,
441442
MyLogicalRepWorker->userid,

src/backend/replication/logical/launcher.c

+21-10
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
303303
* Returns true on success, false on failure.
304304
*/
305305
bool
306-
logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
306+
logicalrep_worker_launch(LogicalRepWorkerType wtype,
307+
Oid dbid, Oid subid, const char *subname, Oid userid,
307308
Oid relid, dsm_handle subworker_dsm)
308309
{
309310
BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
315316
int nsyncworkers;
316317
int nparallelapplyworkers;
317318
TimestampTz now;
318-
bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
319-
320-
/* Sanity check - tablesync worker cannot be a subworker */
321-
Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
319+
bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
320+
bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
321+
322+
/*----------
323+
* Sanity checks:
324+
* - must be valid worker type
325+
* - tablesync workers are only ones to have relid
326+
* - parallel apply worker is the only kind of subworker
327+
*/
328+
Assert(wtype != WORKERTYPE_UNKNOWN);
329+
Assert(is_tablesync_worker == OidIsValid(relid));
330+
Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
322331

323332
ereport(DEBUG1,
324333
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
393402
* sync worker limit per subscription. So, just return silently as we
394403
* might get here because of an otherwise harmless race condition.
395404
*/
396-
if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
405+
if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
397406
{
398407
LWLockRelease(LogicalRepWorkerLock);
399408
return false;
@@ -427,6 +436,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
427436
}
428437

429438
/* Prepare the worker slot. */
439+
worker->type = wtype;
430440
worker->launch_time = now;
431441
worker->in_use = true;
432442
worker->generation++;
@@ -466,7 +476,7 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
466476
subid);
467477
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
468478
}
469-
else if (OidIsValid(relid))
479+
else if (is_tablesync_worker)
470480
{
471481
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
472482
snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
847857
{
848858
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
849859

850-
if (w->subid == subid && OidIsValid(w->relid))
860+
if (w->subid == subid && isTablesyncWorker(w))
851861
res++;
852862
}
853863

@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
11801190
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
11811191
{
11821192
ApplyLauncherSetWorkerStartTime(sub->oid, now);
1183-
logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
1193+
logicalrep_worker_launch(WORKERTYPE_APPLY,
1194+
sub->dbid, sub->oid, sub->name,
11841195
sub->owner, InvalidOid,
11851196
DSM_HANDLE_INVALID);
11861197
}
@@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
12901301
worker_pid = worker.proc->pid;
12911302

12921303
values[0] = ObjectIdGetDatum(worker.subid);
1293-
if (OidIsValid(worker.relid))
1304+
if (isTablesyncWorker(&worker))
12941305
values[1] = ObjectIdGetDatum(worker.relid);
12951306
else
12961307
nulls[1] = true;

src/backend/replication/logical/tablesync.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
587587
TimestampDifferenceExceeds(hentry->last_start_time, now,
588588
wal_retrieve_retry_interval))
589589
{
590-
logicalrep_worker_launch(MyLogicalRepWorker->dbid,
590+
logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
591+
MyLogicalRepWorker->dbid,
591592
MySubscription->oid,
592593
MySubscription->name,
593594
MyLogicalRepWorker->userid,

src/include/replication/worker_internal.h

+17-5
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,20 @@
2727
#include "storage/shm_toc.h"
2828
#include "storage/spin.h"
2929

30+
/* Different types of worker */
31+
typedef enum LogicalRepWorkerType
32+
{
33+
WORKERTYPE_UNKNOWN = 0,
34+
WORKERTYPE_TABLESYNC,
35+
WORKERTYPE_APPLY,
36+
WORKERTYPE_PARALLEL_APPLY
37+
} LogicalRepWorkerType;
3038

3139
typedef struct LogicalRepWorker
3240
{
41+
/* What type of worker is this? */
42+
LogicalRepWorkerType type;
43+
3344
/* Time at which this worker was launched. */
3445
TimestampTz launch_time;
3546

@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
232243
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
233244
bool only_running);
234245
extern List *logicalrep_workers_find(Oid subid, bool only_running);
235-
extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
246+
extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
247+
Oid dbid, Oid subid, const char *subname,
236248
Oid userid, Oid relid,
237249
dsm_handle subworker_dsm);
238250
extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,19 @@ extern void pa_decr_and_wait_stream_block(void);
315327
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
316328
XLogRecPtr remote_lsn);
317329

318-
#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
330+
#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
331+
#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
319332

320333
static inline bool
321334
am_tablesync_worker(void)
322335
{
323-
return OidIsValid(MyLogicalRepWorker->relid);
336+
return isTablesyncWorker(MyLogicalRepWorker);
324337
}
325338

326339
static inline bool
327340
am_leader_apply_worker(void)
328341
{
329-
return (!am_tablesync_worker() &&
330-
!isParallelApplyWorker(MyLogicalRepWorker));
342+
return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
331343
}
332344

333345
static inline bool

src/tools/pgindent/typedefs.list

+1
Original file line numberDiff line numberDiff line change
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
15001500
LogicalRepTupleData
15011501
LogicalRepTyp
15021502
LogicalRepWorker
1503+
LogicalRepWorkerType
15031504
LogicalRewriteMappingData
15041505
LogicalTape
15051506
LogicalTapeSet

0 commit comments

Comments
 (0)