Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila2023-08-14 03:08:03 +0000
committerAmit Kapila2023-08-14 03:08:03 +0000
commit2a8b40e3681921943a2989fd4ec6cdbf8766566c (patch)
treea5c55ec41deb79663ab7b9585f63bd6b1dcd3ad7 /src/backend
parent3d8d217450a63638825167c17ed791122f376176 (diff)
Simplify determining logical replication worker types.
We deduce a LogicalRepWorker's type from the values of several different fields ('relid' and 'leader_pid') whenever logic needs to know it. In fact, the logical replication worker type is already known at the time of launching the LogicalRepWorker and it never changes for the lifetime of that process. Instead of deducing the type, it is simpler to just store it one time, and access it directly thereafter. Author: Peter Smith Reviewed-by: Amit Kapila, Bharath Rupireddy Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/logical/applyparallelworker.c3
-rw-r--r--src/backend/replication/logical/launcher.c31
-rw-r--r--src/backend/replication/logical/tablesync.c3
3 files changed, 25 insertions, 12 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 1d4e83c4c1f..4e8ee2973e0 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
return NULL;
}
- launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+ MyLogicalRepWorker->dbid,
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index e231fa7f951..7cc0a16d3bc 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
* Returns true on success, false on failure.
*/
bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+ Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
int nsyncworkers;
int nparallelapplyworkers;
TimestampTz now;
- bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
-
- /* Sanity check - tablesync worker cannot be a subworker */
- Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+ bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+ bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+
+ /*----------
+ * Sanity checks:
+ * - must be valid worker type
+ * - tablesync workers are only ones to have relid
+ * - parallel apply worker is the only kind of subworker
+ */
+ Assert(wtype != WORKERTYPE_UNKNOWN);
+ Assert(is_tablesync_worker == OidIsValid(relid));
+ Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+ if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
@@ -427,6 +436,7 @@ retry:
}
/* Prepare the worker slot. */
+ worker->type = wtype;
worker->launch_time = now;
worker->in_use = true;
worker->generation++;
@@ -466,7 +476,7 @@ retry:
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
}
- else if (OidIsValid(relid))
+ else if (is_tablesync_worker)
{
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && OidIsValid(w->relid))
+ if (w->subid == subid && isTablesyncWorker(w))
res++;
}
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID);
}
@@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
- if (OidIsValid(worker.relid))
+ if (isTablesyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);
else
nulls[1] = true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 651a7750653..67bdd14095e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+ logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->dbid,
MySubscription->oid,
MySubscription->name,
MyLogicalRepWorker->userid,