From 0c582c9fecf6ffae13c4496d6d3ff75fde2cec1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Thu, 19 Sep 2024 09:43:37 -0300 Subject: [PATCH] Background worker for collect metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Fabrízio de Royes Mello --- metrics_grpc.cpp | 20 ++++- metrics_grpc.h | 2 + pgotel.cpp | 217 ++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 198 insertions(+), 41 deletions(-) diff --git a/metrics_grpc.cpp b/metrics_grpc.cpp index 10ab4e2..1914f6d 100644 --- a/metrics_grpc.cpp +++ b/metrics_grpc.cpp @@ -17,11 +17,17 @@ namespace pgotel { otlp_exporter::OtlpGrpcMetricExporterOptions options; std::unique_ptr reader; + bool initialized = false; void InitMetrics(std::string endpoint, std::chrono::milliseconds interval, std::chrono::milliseconds timeout) { + if (pgotel::initialized) + { + return; + } + pgotel::options.endpoint = endpoint; auto exporter = otlp_exporter::OtlpGrpcMetricExporterFactory::Create(options); @@ -43,13 +49,19 @@ namespace pgotel std::shared_ptr provider(std::move(u_provider)); metrics_api::Provider::SetMeterProvider(provider); + + pgotel::initialized = true; } void CleanupMetrics() { - std::shared_ptr none; - metrics_api::Provider::SetMeterProvider(none); + if (pgotel::initialized) + { + std::shared_ptr none; + metrics_api::Provider::SetMeterProvider(none); + pgotel::initialized = false; + } } void @@ -63,6 +75,10 @@ namespace pgotel void Counter(const std::string &name, double value, std::map labels) { + if (!initialized) + { + return; + } std::string counter_name = "counter." + name; auto provider = metrics_api::Provider::GetMeterProvider(); opentelemetry::nostd::shared_ptr meter = diff --git a/metrics_grpc.h b/metrics_grpc.h index a6b9699..0127dae 100644 --- a/metrics_grpc.h +++ b/metrics_grpc.h @@ -11,6 +11,8 @@ *------------------------------------------------------------------------- */ +#pragma once + #include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter_factory.h" #include "opentelemetry/metrics/provider.h" #include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h" diff --git a/pgotel.cpp b/pgotel.cpp index 56db82b..497de8a 100644 --- a/pgotel.cpp +++ b/pgotel.cpp @@ -18,24 +18,42 @@ extern "C" { #endif #include "postgres.h" + #include "fmgr.h" +#include "executor/spi.h" #include "miscadmin.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/jsonb.h" +#include "utils/snapmgr.h" + +/* Those are necessary for bgworker */ +#include "pgstat.h" +#include "postmaster/bgworker.h" #include "storage/ipc.h" +#include "storage/latch.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/shmem.h" PG_MODULE_MAGIC; PG_FUNCTION_INFO_V1(pgotel_counter); void _PG_init(void); +PGDLLEXPORT void pgotel_worker_main(Datum main_arg) pg_attribute_noreturn(); /* GUC variables */ static bool pgotel_enabled = false; static int pgotel_interval = 2000; static int pgotel_timeout = 500; static char *pgotel_endpoint = NULL; +static int pgotel_worker_idle_time = 100; +static char *pgotel_dbname = NULL; + +/* Signal handling */ +static volatile sig_atomic_t got_sigterm = false; +static volatile sig_atomic_t got_sighup = false; Datum pgotel_counter(PG_FUNCTION_ARGS) @@ -84,42 +102,35 @@ pgotel_counter(PG_FUNCTION_ARGS) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid labels JSONB"))); /* Send the counter to the OTEL-Collector */ + pgotel::InitMetrics(pgotel_endpoint, + std::chrono::milliseconds(pgotel_interval), + std::chrono::milliseconds(pgotel_timeout)); pgotel::Counter(counter_name, value, labels_map); PG_RETURN_NULL(); } static void -pgotel_assign_interval_guc(int newval, void *extra) -{ - elog(DEBUG1, "pgotel_interval: %d", newval); - elog(DEBUG1, "pgotel_timeout: %d", pgotel_timeout); - - pgotel::RestartMetrics(pgotel_endpoint, - std::chrono::milliseconds(newval), - std::chrono::milliseconds(pgotel_timeout)); -} - -static void -pgotel_assign_timeout_guc(int newval, void *extra) -{ - elog(DEBUG1, "pgotel_interval: %d", pgotel_interval); - elog(DEBUG1, "pgotel_timeout: %d", newval); - - pgotel::RestartMetrics(pgotel_endpoint, - std::chrono::milliseconds(pgotel_interval), - std::chrono::milliseconds(newval)); -} - -static void -load_params(void) +pgotel_define_gucs(void) { DefineCustomStringVariable("pgotel.endpoint", "Endpoint of the OTEL-Collector", NULL, &pgotel_endpoint, "localhost:4317", - PGC_USERSET, + PGC_SIGHUP, + 0, + NULL, + NULL, + NULL); + + DefineCustomStringVariable("pgotel.dbname", + "Database for the worker collect information to send to the " + "OTEL-Collector", + NULL, + &pgotel_dbname, + "postgres", + PGC_SIGHUP, 0, NULL, NULL, @@ -144,10 +155,10 @@ load_params(void) 2000, 1000, INT_MAX, - PGC_USERSET, + PGC_SIGHUP, 0, NULL, - pgotel_assign_interval_guc, + NULL, NULL); DefineCustomIntVariable("pgotel.timeout", @@ -157,35 +168,163 @@ load_params(void) 500, 100, INT_MAX, - PGC_USERSET, + PGC_SIGHUP, 0, NULL, - pgotel_assign_timeout_guc, + NULL, NULL); } -/* - * _PG_init - * Entry point loading hooks - */ +static void +pgotel_worker_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + errno = save_errno; +} + +static void +pgotel_worker_sighup(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sighup = true; + if (MyProc) + SetLatch(&MyProc->procLatch); + errno = save_errno; +} + void -_PG_init(void) +pgotel_worker_main(Datum main_arg) { - load_params(); - elog(DEBUG1, "endpoint: %s", pgotel_endpoint); + StringInfoData buf; + + /* Register functions for SIGTERM/SIGHUP management */ + pqsignal(SIGHUP, pgotel_worker_sighup); + pqsignal(SIGTERM, pgotel_worker_sigterm); + + /* We're now ready to receive signals */ + BackgroundWorkerUnblockSignals(); + + /* Connect to a database */ + BackgroundWorkerInitializeConnection(pgotel_dbname, NULL, 0); + + elog(LOG, "pgotel_worker: started and connected to %s database", pgotel_dbname); + + /* Build the query string */ + initStringInfo(&buf); + appendStringInfo(&buf, "\ + SELECT pgotel_counter( \ + 'idx_scan', idx_scan, \ + jsonb_build_object('dbname', current_database(), 'schemaname', schemaname, 'relname', relname)) \ + FROM pg_catalog.pg_stat_user_tables \ + WHERE coalesce(idx_scan, 0) > 0 \ + UNION ALL \ + SELECT pgotel_counter( \ + 'seq_scan', seq_scan, \ + jsonb_build_object('dbname', current_database(), 'schemaname', schemaname, 'relname', relname)) \ + FROM pg_catalog.pg_stat_user_tables \ + WHERE coalesce(seq_scan, 0) > 0"); + + /* Initialize OpenTelemetry Metrics SDK */ pgotel::InitMetrics(pgotel_endpoint, std::chrono::milliseconds(pgotel_interval), std::chrono::milliseconds(pgotel_timeout)); + + while (!got_sigterm) + { + /* Wait necessary amount of time */ + WaitLatch(&MyProc->procLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + pgotel_worker_idle_time * 1L, + PG_WAIT_EXTENSION); + ResetLatch(&MyProc->procLatch); + + /* Process signals */ + if (got_sighup) + { + /* Process config file */ + ProcessConfigFile(PGC_SIGHUP); + got_sighup = false; + + /* Restart OpenTelemetry Metrics SDK */ + pgotel::CleanupMetrics(); + pgotel::InitMetrics(pgotel_endpoint, + std::chrono::milliseconds(pgotel_interval), + std::chrono::milliseconds(pgotel_timeout)); + + ereport(LOG, (errmsg("%s: processed SIGHUP", "pgotel_worker"))); + } + + if (got_sigterm) + { + /* Simply exit */ + ereport(LOG, (errmsg("%s: processed SIGTERM", "pgotel_worker"))); + break; + } + + /* Update NOW() to return correct timestamp */ + SetCurrentStatementStartTimestamp(); + + /* Show query status in pg_stat_activity */ + pgstat_report_activity(STATE_RUNNING, "pgotel_worker_main"); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* Execute NOTIFY requests */ + int ret = SPI_execute(buf.data, false, 0); + if (ret != SPI_OK_SELECT) + elog(FATAL, "pgotel_worker: SPI_execute failed with error code %d", ret); + // elog(LOG, "pgotel_worker: executed " UINT64_FORMAT, SPI_processed); + + /* Terminate transaction */ + SPI_finish(); + PopActiveSnapshot(); + + /* Notifications are sent by the transaction commit */ + CommitTransactionCommand(); + + pgstat_report_activity(STATE_IDLE, NULL); + } + + /* No problems, so clean exit */ + pgotel::CleanupMetrics(); + proc_exit(0); +} + +static void +pgotel_register_worker() +{ + BackgroundWorker worker; + + /* Worker parameter and registration */ + MemSet(&worker, 0, sizeof(BackgroundWorker)); + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + snprintf(worker.bgw_library_name, BGW_MAXLEN, "pgotel"); + snprintf(worker.bgw_function_name, BGW_MAXLEN, "pgotel_worker_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, "%s", "PGOTEL Worker"); + + /* Wait 10 seconds for restart before crash */ + worker.bgw_restart_time = 10; + worker.bgw_main_arg = (Datum) 0; + worker.bgw_notify_pid = 0; + RegisterBackgroundWorker(&worker); } /* - * _PG_fini - * Exit point unloading hooks + * _PG_init + * Entry point loading hooks */ void -_PG_fini(void) +_PG_init(void) { - pgotel::CleanupMetrics(); + pgotel_define_gucs(); + pgotel_register_worker(); } /* declaration out of file scope */