Skip to content

Commit

Permalink
Background worker for collect metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Fabrízio de Royes Mello <[email protected]>
  • Loading branch information
fabriziomello committed Sep 20, 2024
1 parent c178994 commit 0c582c9
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 41 deletions.
20 changes: 18 additions & 2 deletions metrics_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ namespace pgotel
{
otlp_exporter::OtlpGrpcMetricExporterOptions options;
std::unique_ptr<metric_sdk::MetricReader> reader;
bool initialized = false;

void
InitMetrics(std::string endpoint, std::chrono::milliseconds interval,
std::chrono::milliseconds timeout)
{
if (pgotel::initialized)
{
return;
}

pgotel::options.endpoint = endpoint;
auto exporter = otlp_exporter::OtlpGrpcMetricExporterFactory::Create(options);

Expand All @@ -43,13 +49,19 @@ namespace pgotel
std::shared_ptr<opentelemetry::metrics::MeterProvider> provider(std::move(u_provider));

metrics_api::Provider::SetMeterProvider(provider);

pgotel::initialized = true;
}

void
CleanupMetrics()
{
std::shared_ptr<metrics_api::MeterProvider> none;
metrics_api::Provider::SetMeterProvider(none);
if (pgotel::initialized)
{
std::shared_ptr<metrics_api::MeterProvider> none;
metrics_api::Provider::SetMeterProvider(none);
pgotel::initialized = false;
}
}

void
Expand All @@ -63,6 +75,10 @@ namespace pgotel
void
Counter(const std::string &name, double value, std::map<std::string, std::string> labels)
{
if (!initialized)
{
return;
}
std::string counter_name = "counter." + name;
auto provider = metrics_api::Provider::GetMeterProvider();
opentelemetry::nostd::shared_ptr<metrics_api::Meter> meter =
Expand Down
2 changes: 2 additions & 0 deletions metrics_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*-------------------------------------------------------------------------
*/

#pragma once

#include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter_factory.h"
#include "opentelemetry/metrics/provider.h"
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
Expand Down
217 changes: 178 additions & 39 deletions pgotel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,42 @@ extern "C"
{
#endif
#include "postgres.h"

#include "fmgr.h"
#include "executor/spi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/jsonb.h"
#include "utils/snapmgr.h"

/* Those are necessary for bgworker */
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"

PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1(pgotel_counter);

void _PG_init(void);
PGDLLEXPORT void pgotel_worker_main(Datum main_arg) pg_attribute_noreturn();

/* GUC variables */
static bool pgotel_enabled = false;
static int pgotel_interval = 2000;
static int pgotel_timeout = 500;
static char *pgotel_endpoint = NULL;
static int pgotel_worker_idle_time = 100;
static char *pgotel_dbname = NULL;

/* Signal handling */
static volatile sig_atomic_t got_sigterm = false;
static volatile sig_atomic_t got_sighup = false;

Datum
pgotel_counter(PG_FUNCTION_ARGS)
Expand Down Expand Up @@ -84,42 +102,35 @@ pgotel_counter(PG_FUNCTION_ARGS)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid labels JSONB")));

/* Send the counter to the OTEL-Collector */
pgotel::InitMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(pgotel_timeout));
pgotel::Counter(counter_name, value, labels_map);

PG_RETURN_NULL();
}

static void
pgotel_assign_interval_guc(int newval, void *extra)
{
elog(DEBUG1, "pgotel_interval: %d", newval);
elog(DEBUG1, "pgotel_timeout: %d", pgotel_timeout);

pgotel::RestartMetrics(pgotel_endpoint,
std::chrono::milliseconds(newval),
std::chrono::milliseconds(pgotel_timeout));
}

static void
pgotel_assign_timeout_guc(int newval, void *extra)
{
elog(DEBUG1, "pgotel_interval: %d", pgotel_interval);
elog(DEBUG1, "pgotel_timeout: %d", newval);

pgotel::RestartMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(newval));
}

static void
load_params(void)
pgotel_define_gucs(void)
{
DefineCustomStringVariable("pgotel.endpoint",
"Endpoint of the OTEL-Collector",
NULL,
&pgotel_endpoint,
"localhost:4317",
PGC_USERSET,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);

DefineCustomStringVariable("pgotel.dbname",
"Database for the worker collect information to send to the "
"OTEL-Collector",
NULL,
&pgotel_dbname,
"postgres",
PGC_SIGHUP,
0,
NULL,
NULL,
Expand All @@ -144,10 +155,10 @@ load_params(void)
2000,
1000,
INT_MAX,
PGC_USERSET,
PGC_SIGHUP,
0,
NULL,
pgotel_assign_interval_guc,
NULL,
NULL);

DefineCustomIntVariable("pgotel.timeout",
Expand All @@ -157,35 +168,163 @@ load_params(void)
500,
100,
INT_MAX,
PGC_USERSET,
PGC_SIGHUP,
0,
NULL,
pgotel_assign_timeout_guc,
NULL,
NULL);
}

/*
* _PG_init
* Entry point loading hooks
*/
static void
pgotel_worker_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;

got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}

static void
pgotel_worker_sighup(SIGNAL_ARGS)
{
int save_errno = errno;

got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}

void
_PG_init(void)
pgotel_worker_main(Datum main_arg)
{
load_params();
elog(DEBUG1, "endpoint: %s", pgotel_endpoint);
StringInfoData buf;

/* Register functions for SIGTERM/SIGHUP management */
pqsignal(SIGHUP, pgotel_worker_sighup);
pqsignal(SIGTERM, pgotel_worker_sigterm);

/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();

/* Connect to a database */
BackgroundWorkerInitializeConnection(pgotel_dbname, NULL, 0);

elog(LOG, "pgotel_worker: started and connected to %s database", pgotel_dbname);

/* Build the query string */
initStringInfo(&buf);
appendStringInfo(&buf, "\
SELECT pgotel_counter( \
'idx_scan', idx_scan, \
jsonb_build_object('dbname', current_database(), 'schemaname', schemaname, 'relname', relname)) \
FROM pg_catalog.pg_stat_user_tables \
WHERE coalesce(idx_scan, 0) > 0 \
UNION ALL \
SELECT pgotel_counter( \
'seq_scan', seq_scan, \
jsonb_build_object('dbname', current_database(), 'schemaname', schemaname, 'relname', relname)) \
FROM pg_catalog.pg_stat_user_tables \
WHERE coalesce(seq_scan, 0) > 0");

/* Initialize OpenTelemetry Metrics SDK */
pgotel::InitMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(pgotel_timeout));

while (!got_sigterm)
{
/* Wait necessary amount of time */
WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
pgotel_worker_idle_time * 1L,
PG_WAIT_EXTENSION);
ResetLatch(&MyProc->procLatch);

/* Process signals */
if (got_sighup)
{
/* Process config file */
ProcessConfigFile(PGC_SIGHUP);
got_sighup = false;

/* Restart OpenTelemetry Metrics SDK */
pgotel::CleanupMetrics();
pgotel::InitMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(pgotel_timeout));

ereport(LOG, (errmsg("%s: processed SIGHUP", "pgotel_worker")));
}

if (got_sigterm)
{
/* Simply exit */
ereport(LOG, (errmsg("%s: processed SIGTERM", "pgotel_worker")));
break;
}

/* Update NOW() to return correct timestamp */
SetCurrentStatementStartTimestamp();

/* Show query status in pg_stat_activity */
pgstat_report_activity(STATE_RUNNING, "pgotel_worker_main");
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());

/* Execute NOTIFY requests */
int ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "pgotel_worker: SPI_execute failed with error code %d", ret);
// elog(LOG, "pgotel_worker: executed " UINT64_FORMAT, SPI_processed);

/* Terminate transaction */
SPI_finish();
PopActiveSnapshot();

/* Notifications are sent by the transaction commit */
CommitTransactionCommand();

pgstat_report_activity(STATE_IDLE, NULL);
}

/* No problems, so clean exit */
pgotel::CleanupMetrics();
proc_exit(0);
}

static void
pgotel_register_worker()
{
BackgroundWorker worker;

/* Worker parameter and registration */
MemSet(&worker, 0, sizeof(BackgroundWorker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(worker.bgw_library_name, BGW_MAXLEN, "pgotel");
snprintf(worker.bgw_function_name, BGW_MAXLEN, "pgotel_worker_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "%s", "PGOTEL Worker");

/* Wait 10 seconds for restart before crash */
worker.bgw_restart_time = 10;
worker.bgw_main_arg = (Datum) 0;
worker.bgw_notify_pid = 0;
RegisterBackgroundWorker(&worker);
}

/*
* _PG_fini
* Exit point unloading hooks
* _PG_init
* Entry point loading hooks
*/
void
_PG_fini(void)
_PG_init(void)
{
pgotel::CleanupMetrics();
pgotel_define_gucs();
pgotel_register_worker();
}

/* declaration out of file scope */
Expand Down

0 comments on commit 0c582c9

Please sign in to comment.