diff --git a/collector.c b/collector.c index 23cb7f0..e1e00cf 100644 --- a/collector.c +++ b/collector.c @@ -37,7 +37,7 @@ static void handle_sigterm(SIGNAL_ARGS); * Register background worker for collecting waits history. */ void -register_wait_collector(void) +pgws_register_wait_collector(void) { BackgroundWorker worker; @@ -48,7 +48,7 @@ register_wait_collector(void) worker.bgw_restart_time = 1; worker.bgw_notify_pid = 0; snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_wait_sampling"); - snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(collector_main)); + snprintf(worker.bgw_function_name, BGW_MAXLEN, CppAsString(pgws_collector_main)); snprintf(worker.bgw_name, BGW_MAXLEN, "pg_wait_sampling collector"); worker.bgw_main_arg = (Datum) 0; RegisterBackgroundWorker(&worker); @@ -57,7 +57,7 @@ register_wait_collector(void) /* * Allocate memory for waits history. */ -void +static void alloc_history(History *observations, int count) { observations->items = (HistoryItem *) palloc0(sizeof(HistoryItem) * count); @@ -151,7 +151,7 @@ probe_waits(History *observations, HTAB *profile_hash, TimestampTz ts = GetCurrentTimestamp(); /* Realloc waits history if needed */ - newSize = collector_hdr->historySize; + newSize = pgws_collector_hdr->historySize; if (observations->count != newSize) realloc_history(observations, newSize); @@ -173,8 +173,8 @@ probe_waits(History *observations, HTAB *profile_hash, item.pid = proc->pid; item.wait_event_info = proc->wait_event_info; - if (collector_hdr->profileQueries) - item.queryId = proc_queryids[i]; + if (pgws_collector_hdr->profileQueries) + item.queryId = pgws_proc_queryids[i]; else item.queryId = 0; @@ -292,7 +292,7 @@ make_profile_hash() hash_ctl.hash = tag_hash; hash_ctl.hcxt = TopMemoryContext; - if (collector_hdr->profileQueries) + if (pgws_collector_hdr->profileQueries) hash_ctl.keysize = offsetof(ProfileItem, count); else hash_ctl.keysize = offsetof(ProfileItem, queryId); @@ -321,7 +321,7 @@ millisecs_diff(TimestampTz tz1, TimestampTz tz2) * Main routine of wait history collector. */ void -collector_main(Datum main_arg) +pgws_collector_main(Datum main_arg) { HTAB *profile_hash = NULL; History observations; @@ -358,13 +358,13 @@ collector_main(Datum main_arg) pgstat_report_appname("pg_wait_sampling collector"); profile_hash = make_profile_hash(); - collector_hdr->latch = &MyProc->procLatch; + pgws_collector_hdr->latch = &MyProc->procLatch; CurrentResourceOwner = ResourceOwnerCreate(NULL, "pg_wait_sampling collector"); collector_context = AllocSetContextCreate(TopMemoryContext, "pg_wait_sampling context", ALLOCSET_DEFAULT_SIZES); old_context = MemoryContextSwitchTo(collector_context); - alloc_history(&observations, collector_hdr->historySize); + alloc_history(&observations, pgws_collector_hdr->historySize); MemoryContextSwitchTo(old_context); ereport(LOG, (errmsg("pg_wait_sampling collector started"))); @@ -391,8 +391,8 @@ collector_main(Datum main_arg) history_diff = millisecs_diff(history_ts, current_ts); profile_diff = millisecs_diff(profile_ts, current_ts); - history_period = collector_hdr->historyPeriod; - profile_period = collector_hdr->profilePeriod; + history_period = pgws_collector_hdr->historyPeriod; + profile_period = pgws_collector_hdr->profilePeriod; write_history = (history_diff >= (int64)history_period); write_profile = (profile_diff >= (int64)profile_period); @@ -400,7 +400,7 @@ collector_main(Datum main_arg) if (write_history || write_profile) { probe_waits(&observations, profile_hash, - write_history, write_profile, collector_hdr->profilePid); + write_history, write_profile, pgws_collector_hdr->profilePid); if (write_history) { @@ -439,24 +439,24 @@ collector_main(Datum main_arg) ResetLatch(&MyProc->procLatch); /* Handle request if any */ - if (collector_hdr->request != NO_REQUEST) + if (pgws_collector_hdr->request != NO_REQUEST) { LOCKTAG tag; SHMRequest request; - init_lock_tag(&tag, PGWS_COLLECTOR_LOCK); + pgws_init_lock_tag(&tag, PGWS_COLLECTOR_LOCK); LockAcquire(&tag, ExclusiveLock, false, false); - request = collector_hdr->request; - collector_hdr->request = NO_REQUEST; + request = pgws_collector_hdr->request; + pgws_collector_hdr->request = NO_REQUEST; if (request == HISTORY_REQUEST || request == PROFILE_REQUEST) { shm_mq_result mq_result; /* Send history or profile */ - shm_mq_set_sender(collector_mq, MyProc); - mqh = shm_mq_attach(collector_mq, NULL, NULL); + shm_mq_set_sender(pgws_collector_mq, MyProc); + mqh = shm_mq_attach(pgws_collector_mq, NULL, NULL); mq_result = shm_mq_wait_for_attach(mqh); switch (mq_result) { @@ -482,7 +482,7 @@ collector_main(Datum main_arg) default: AssertState(false); } - shm_mq_detach_compat(mqh, collector_mq); + shm_mq_detach_compat(mqh, pgws_collector_mq); } else if (request == PROFILE_RESET) { diff --git a/pg_wait_sampling.c b/pg_wait_sampling.c index 22732dd..c77f980 100644 --- a/pg_wait_sampling.c +++ b/pg_wait_sampling.c @@ -41,23 +41,21 @@ PG_MODULE_MAGIC; void _PG_init(void); -/* Global variables */ -bool shmem_initialized = false; +static bool shmem_initialized = false; /* Hooks */ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL; static planner_hook_type planner_hook_next = NULL; -/* Shared memory variables */ -shm_toc *toc = NULL; -shm_mq *collector_mq = NULL; -uint64 *proc_queryids = NULL; -CollectorShmqHeader *collector_hdr = NULL; +/* Pointers to shared memory objects */ +shm_mq *pgws_collector_mq = NULL; +uint64 *pgws_proc_queryids = NULL; +CollectorShmqHeader *pgws_collector_hdr = NULL; /* Receiver (backend) local shm_mq pointers and lock */ -shm_mq *recv_mq = NULL; -shm_mq_handle *recv_mqh = NULL; -LOCKTAG queueTag; +static shm_mq *recv_mq = NULL; +static shm_mq_handle *recv_mqh = NULL; +static LOCKTAG queueTag; #if PG_VERSION_NUM >= 150000 static shmem_request_hook_type prev_shmem_request_hook = NULL; @@ -218,63 +216,63 @@ setup_gucs() if (!strcmp(name, "pg_wait_sampling.history_size")) { history_size_found = true; - var->integer.variable = &collector_hdr->historySize; - collector_hdr->historySize = 5000; + var->integer.variable = &pgws_collector_hdr->historySize; + pgws_collector_hdr->historySize = 5000; } else if (!strcmp(name, "pg_wait_sampling.history_period")) { history_period_found = true; - var->integer.variable = &collector_hdr->historyPeriod; - collector_hdr->historyPeriod = 10; + var->integer.variable = &pgws_collector_hdr->historyPeriod; + pgws_collector_hdr->historyPeriod = 10; } else if (!strcmp(name, "pg_wait_sampling.profile_period")) { profile_period_found = true; - var->integer.variable = &collector_hdr->profilePeriod; - collector_hdr->profilePeriod = 10; + var->integer.variable = &pgws_collector_hdr->profilePeriod; + pgws_collector_hdr->profilePeriod = 10; } else if (!strcmp(name, "pg_wait_sampling.profile_pid")) { profile_pid_found = true; - var->_bool.variable = &collector_hdr->profilePid; - collector_hdr->profilePid = true; + var->_bool.variable = &pgws_collector_hdr->profilePid; + pgws_collector_hdr->profilePid = true; } else if (!strcmp(name, "pg_wait_sampling.profile_queries")) { profile_queries_found = true; - var->_bool.variable = &collector_hdr->profileQueries; - collector_hdr->profileQueries = true; + var->_bool.variable = &pgws_collector_hdr->profileQueries; + pgws_collector_hdr->profileQueries = true; } } if (!history_size_found) DefineCustomIntVariable("pg_wait_sampling.history_size", "Sets size of waits history.", NULL, - &collector_hdr->historySize, 5000, 100, INT_MAX, + &pgws_collector_hdr->historySize, 5000, 100, INT_MAX, PGC_SUSET, 0, shmem_int_guc_check_hook, NULL, NULL); if (!history_period_found) DefineCustomIntVariable("pg_wait_sampling.history_period", "Sets period of waits history sampling.", NULL, - &collector_hdr->historyPeriod, 10, 1, INT_MAX, + &pgws_collector_hdr->historyPeriod, 10, 1, INT_MAX, PGC_SUSET, 0, shmem_int_guc_check_hook, NULL, NULL); if (!profile_period_found) DefineCustomIntVariable("pg_wait_sampling.profile_period", "Sets period of waits profile sampling.", NULL, - &collector_hdr->profilePeriod, 10, 1, INT_MAX, + &pgws_collector_hdr->profilePeriod, 10, 1, INT_MAX, PGC_SUSET, 0, shmem_int_guc_check_hook, NULL, NULL); if (!profile_pid_found) DefineCustomBoolVariable("pg_wait_sampling.profile_pid", "Sets whether profile should be collected per pid.", NULL, - &collector_hdr->profilePid, true, + &pgws_collector_hdr->profilePid, true, PGC_SUSET, 0, shmem_bool_guc_check_hook, NULL, NULL); if (!profile_queries_found) DefineCustomBoolVariable("pg_wait_sampling.profile_queries", "Sets whether profile should be collected per query.", NULL, - &collector_hdr->profileQueries, true, + &pgws_collector_hdr->profileQueries, true, PGC_SUSET, 0, shmem_bool_guc_check_hook, NULL, NULL); if (history_size_found @@ -310,9 +308,10 @@ pgws_shmem_request(void) static void pgws_shmem_startup(void) { - bool found; - Size segsize = pgws_shmem_size(); - void *pgws; + bool found; + Size segsize = pgws_shmem_size(); + void *pgws; + shm_toc *toc; pgws = ShmemInitStruct("pg_wait_sampling", segsize, &found); @@ -320,14 +319,14 @@ pgws_shmem_startup(void) { toc = shm_toc_create(PG_WAIT_SAMPLING_MAGIC, pgws, segsize); - collector_hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader)); - shm_toc_insert(toc, 0, collector_hdr); - collector_mq = shm_toc_allocate(toc, COLLECTOR_QUEUE_SIZE); - shm_toc_insert(toc, 1, collector_mq); - proc_queryids = shm_toc_allocate(toc, + pgws_collector_hdr = shm_toc_allocate(toc, sizeof(CollectorShmqHeader)); + shm_toc_insert(toc, 0, pgws_collector_hdr); + pgws_collector_mq = shm_toc_allocate(toc, COLLECTOR_QUEUE_SIZE); + shm_toc_insert(toc, 1, pgws_collector_mq); + pgws_proc_queryids = shm_toc_allocate(toc, sizeof(uint64) * get_max_procs_count()); - shm_toc_insert(toc, 2, proc_queryids); - MemSet(proc_queryids, 0, sizeof(uint64) * get_max_procs_count()); + shm_toc_insert(toc, 2, pgws_proc_queryids); + MemSet(pgws_proc_queryids, 0, sizeof(uint64) * get_max_procs_count()); /* Initialize GUC variables in shared memory */ setup_gucs(); @@ -337,13 +336,13 @@ pgws_shmem_startup(void) toc = shm_toc_attach(PG_WAIT_SAMPLING_MAGIC, pgws); #if PG_VERSION_NUM >= 100000 - collector_hdr = shm_toc_lookup(toc, 0, false); - collector_mq = shm_toc_lookup(toc, 1, false); - proc_queryids = shm_toc_lookup(toc, 2, false); + pgws_collector_hdr = shm_toc_lookup(toc, 0, false); + pgws_collector_mq = shm_toc_lookup(toc, 1, false); + pgws_proc_queryids = shm_toc_lookup(toc, 2, false); #else - collector_hdr = shm_toc_lookup(toc, 0); - collector_mq = shm_toc_lookup(toc, 1); - proc_queryids = shm_toc_lookup(toc, 2); + pgws_collector_hdr = shm_toc_lookup(toc, 0); + pgws_collector_mq = shm_toc_lookup(toc, 1); + pgws_proc_queryids = shm_toc_lookup(toc, 2); #endif } @@ -356,7 +355,7 @@ pgws_shmem_startup(void) /* * Check shared memory is initialized. Report an error otherwise. */ -void +static void check_shmem(void) { if (!shmem_initialized) @@ -395,7 +394,7 @@ _PG_init(void) RequestAddinShmemSpace(pgws_shmem_size()); #endif - register_wait_collector(); + pgws_register_wait_collector(); /* * Install hooks. @@ -490,7 +489,7 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) item = ¶ms->items[0]; item->pid = proc->pid; item->wait_event_info = proc->wait_event_info; - item->queryId = proc_queryids[proc - ProcGlobal->allProcs]; + item->queryId = pgws_proc_queryids[proc - ProcGlobal->allProcs]; funcctx->max_calls = 1; } else @@ -508,7 +507,7 @@ pg_wait_sampling_get_current(PG_FUNCTION_ARGS) { params->items[j].pid = proc->pid; params->items[j].wait_event_info = proc->wait_event_info; - params->items[j].queryId = proc_queryids[i]; + params->items[j].queryId = pgws_proc_queryids[i]; j++; } } @@ -569,7 +568,7 @@ typedef struct } Profile; void -init_lock_tag(LOCKTAG *tag, uint32 lock) +pgws_init_lock_tag(LOCKTAG *tag, uint32 lock) { tag->locktag_field1 = PG_WAIT_SAMPLING_MAGIC; tag->locktag_field2 = lock; @@ -592,20 +591,20 @@ receive_array(SHMRequest request, Size item_size, Size *count) MemoryContext oldctx; /* Ensure nobody else trying to send request to queue */ - init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); + pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); LockAcquire(&queueTag, ExclusiveLock, false, false); - init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); + pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); LockAcquire(&collectorTag, ExclusiveLock, false, false); - recv_mq = shm_mq_create(collector_mq, COLLECTOR_QUEUE_SIZE); - collector_hdr->request = request; + recv_mq = shm_mq_create(pgws_collector_mq, COLLECTOR_QUEUE_SIZE); + pgws_collector_hdr->request = request; LockRelease(&collectorTag, ExclusiveLock, false); - if (!collector_hdr->latch) + if (!pgws_collector_hdr->latch) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("pg_wait_sampling collector wasn't started"))); - SetLatch(collector_hdr->latch); + SetLatch(pgws_collector_hdr->latch); shm_mq_set_receiver(recv_mq, MyProc); @@ -736,7 +735,7 @@ pg_wait_sampling_get_profile(PG_FUNCTION_ARGS) else nulls[2] = true; - if (collector_hdr->profileQueries) + if (pgws_collector_hdr->profileQueries) values[3] = Int64GetDatumFast(item->queryId); else values[3] = (Datum) 0; @@ -758,23 +757,22 @@ PG_FUNCTION_INFO_V1(pg_wait_sampling_reset_profile); Datum pg_wait_sampling_reset_profile(PG_FUNCTION_ARGS) { - LOCKTAG tag; - LOCKTAG tagCollector; + LOCKTAG collectorTag; check_shmem(); - init_lock_tag(&tag, PGWS_QUEUE_LOCK); + pgws_init_lock_tag(&queueTag, PGWS_QUEUE_LOCK); - LockAcquire(&tag, ExclusiveLock, false, false); + LockAcquire(&queueTag, ExclusiveLock, false, false); - init_lock_tag(&tagCollector, PGWS_COLLECTOR_LOCK); - LockAcquire(&tagCollector, ExclusiveLock, false, false); - collector_hdr->request = PROFILE_RESET; - LockRelease(&tagCollector, ExclusiveLock, false); + pgws_init_lock_tag(&collectorTag, PGWS_COLLECTOR_LOCK); + LockAcquire(&collectorTag, ExclusiveLock, false, false); + pgws_collector_hdr->request = PROFILE_RESET; + LockRelease(&collectorTag, ExclusiveLock, false); - SetLatch(collector_hdr->latch); + SetLatch(pgws_collector_hdr->latch); - LockRelease(&tag, ExclusiveLock, false); + LockRelease(&queueTag, ExclusiveLock, false); PG_RETURN_VOID(); } @@ -894,8 +892,8 @@ pgws_planner_hook(Query *parse, StaticAssertExpr(sizeof(parse->queryId) == sizeof(uint32), "queryId size is not uint32"); #endif - if (!proc_queryids[i]) - proc_queryids[i] = parse->queryId; + if (!pgws_proc_queryids[i]) + pgws_proc_queryids[i] = parse->queryId; } @@ -921,7 +919,7 @@ static void pgws_ExecutorEnd(QueryDesc *queryDesc) { if (MyProc) - proc_queryids[MyProc - ProcGlobal->allProcs] = UINT64CONST(0); + pgws_proc_queryids[MyProc - ProcGlobal->allProcs] = UINT64CONST(0); if (prev_ExecutorEnd) prev_ExecutorEnd(queryDesc); diff --git a/pg_wait_sampling.h b/pg_wait_sampling.h index a33d707..29425fc 100644 --- a/pg_wait_sampling.h +++ b/pg_wait_sampling.h @@ -71,16 +71,13 @@ typedef struct } CollectorShmqHeader; /* pg_wait_sampling.c */ -extern void check_shmem(void); -extern CollectorShmqHeader *collector_hdr; -extern shm_mq *collector_mq; -extern uint64 *proc_queryids; -extern void read_current_wait(PGPROC *proc, HistoryItem *item); -extern void init_lock_tag(LOCKTAG *tag, uint32 lock); +extern CollectorShmqHeader *pgws_collector_hdr; +extern shm_mq *pgws_collector_mq; +extern uint64 *pgws_proc_queryids; +extern void pgws_init_lock_tag(LOCKTAG *tag, uint32 lock); /* collector.c */ -extern void register_wait_collector(void); -extern void alloc_history(History *, int); -extern PGDLLEXPORT void collector_main(Datum main_arg); +extern void pgws_register_wait_collector(void); +extern PGDLLEXPORT void pgws_collector_main(Datum main_arg); #endif