Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Background worker for collect metrics #13

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions metrics_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ namespace pgotel
{
otlp_exporter::OtlpGrpcMetricExporterOptions options;
std::unique_ptr<metric_sdk::MetricReader> reader;
bool initialized = false;

void
InitMetrics(std::string endpoint, std::chrono::milliseconds interval,
std::chrono::milliseconds timeout)
{
if (pgotel::initialized)
{
return;
}

pgotel::options.endpoint = endpoint;
auto exporter = otlp_exporter::OtlpGrpcMetricExporterFactory::Create(options);

Expand All @@ -43,13 +49,19 @@ namespace pgotel
std::shared_ptr<opentelemetry::metrics::MeterProvider> provider(std::move(u_provider));

metrics_api::Provider::SetMeterProvider(provider);

pgotel::initialized = true;
}

void
CleanupMetrics()
{
std::shared_ptr<metrics_api::MeterProvider> none;
metrics_api::Provider::SetMeterProvider(none);
if (pgotel::initialized)
{
std::shared_ptr<metrics_api::MeterProvider> none;
metrics_api::Provider::SetMeterProvider(none);
pgotel::initialized = false;
}
}

void
Expand All @@ -63,6 +75,10 @@ namespace pgotel
void
Counter(const std::string &name, double value, std::map<std::string, std::string> labels)
{
if (!initialized)
{
return;
}
std::string counter_name = "counter." + name;
auto provider = metrics_api::Provider::GetMeterProvider();
opentelemetry::nostd::shared_ptr<metrics_api::Meter> meter =
Expand Down
2 changes: 2 additions & 0 deletions metrics_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*-------------------------------------------------------------------------
*/

#pragma once

#include "opentelemetry/exporters/otlp/otlp_grpc_metric_exporter_factory.h"
#include "opentelemetry/metrics/provider.h"
#include "opentelemetry/sdk/metrics/export/periodic_exporting_metric_reader.h"
Expand Down
217 changes: 178 additions & 39 deletions pgotel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,42 @@ extern "C"
{
#endif
#include "postgres.h"

#include "fmgr.h"
#include "executor/spi.h"
#include "miscadmin.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/jsonb.h"
#include "utils/snapmgr.h"

/* Those are necessary for bgworker */
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "storage/ipc.h"
#include "storage/latch.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/shmem.h"

PG_MODULE_MAGIC;

PG_FUNCTION_INFO_V1(pgotel_counter);

void _PG_init(void);
PGDLLEXPORT void pgotel_worker_main(Datum main_arg) pg_attribute_noreturn();

/* GUC variables */
static bool pgotel_enabled = false;
static int pgotel_interval = 2000;
static int pgotel_timeout = 500;
static char *pgotel_endpoint = NULL;
static int pgotel_worker_idle_time = 100;
static char *pgotel_dbname = NULL;

/* Signal handling */
static volatile sig_atomic_t got_sigterm = false;
static volatile sig_atomic_t got_sighup = false;

Datum
pgotel_counter(PG_FUNCTION_ARGS)
Expand Down Expand Up @@ -84,42 +102,35 @@ pgotel_counter(PG_FUNCTION_ARGS)
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid labels JSONB")));

/* Send the counter to the OTEL-Collector */
pgotel::InitMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(pgotel_timeout));
pgotel::Counter(counter_name, value, labels_map);

PG_RETURN_NULL();
}

static void
pgotel_assign_interval_guc(int newval, void *extra)
{
elog(DEBUG1, "pgotel_interval: %d", newval);
elog(DEBUG1, "pgotel_timeout: %d", pgotel_timeout);

pgotel::RestartMetrics(pgotel_endpoint,
std::chrono::milliseconds(newval),
std::chrono::milliseconds(pgotel_timeout));
}

static void
pgotel_assign_timeout_guc(int newval, void *extra)
{
elog(DEBUG1, "pgotel_interval: %d", pgotel_interval);
elog(DEBUG1, "pgotel_timeout: %d", newval);

pgotel::RestartMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(newval));
}

static void
load_params(void)
pgotel_define_gucs(void)
{
DefineCustomStringVariable("pgotel.endpoint",
"Endpoint of the OTEL-Collector",
NULL,
&pgotel_endpoint,
"localhost:4317",
PGC_USERSET,
PGC_SIGHUP,
0,
NULL,
NULL,
NULL);

DefineCustomStringVariable("pgotel.dbname",
"Database for the worker collect information to send to the "
"OTEL-Collector",
NULL,
&pgotel_dbname,
"postgres",
PGC_SIGHUP,
0,
NULL,
NULL,
Expand All @@ -144,10 +155,10 @@ load_params(void)
2000,
1000,
INT_MAX,
PGC_USERSET,
PGC_SIGHUP,
0,
NULL,
pgotel_assign_interval_guc,
NULL,
NULL);

DefineCustomIntVariable("pgotel.timeout",
Expand All @@ -157,35 +168,163 @@ load_params(void)
500,
100,
INT_MAX,
PGC_USERSET,
PGC_SIGHUP,
0,
NULL,
pgotel_assign_timeout_guc,
NULL,
NULL);
}

/*
* _PG_init
* Entry point loading hooks
*/
static void
pgotel_worker_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;

got_sigterm = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}

static void
pgotel_worker_sighup(SIGNAL_ARGS)
{
int save_errno = errno;

got_sighup = true;
if (MyProc)
SetLatch(&MyProc->procLatch);
errno = save_errno;
}

void
_PG_init(void)
pgotel_worker_main(Datum main_arg)
{
load_params();
elog(DEBUG1, "endpoint: %s", pgotel_endpoint);
StringInfoData buf;

/* Register functions for SIGTERM/SIGHUP management */
pqsignal(SIGHUP, pgotel_worker_sighup);
pqsignal(SIGTERM, pgotel_worker_sigterm);

/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();

/* Connect to a database */
BackgroundWorkerInitializeConnection(pgotel_dbname, NULL, 0);

elog(LOG, "pgotel_worker: started and connected to %s database", pgotel_dbname);

/* Build the query string */
initStringInfo(&buf);
appendStringInfo(&buf, "\
SELECT pgotel_counter( \
'idx_scan', idx_scan, \
jsonb_build_object('dbname', current_database(), 'schemaname', schemaname, 'relname', relname)) \
FROM pg_catalog.pg_stat_user_tables \
WHERE coalesce(idx_scan, 0) > 0 \
UNION ALL \
SELECT pgotel_counter( \
'seq_scan', seq_scan, \
jsonb_build_object('dbname', current_database(), 'schemaname', schemaname, 'relname', relname)) \
FROM pg_catalog.pg_stat_user_tables \
WHERE coalesce(seq_scan, 0) > 0");

/* Initialize OpenTelemetry Metrics SDK */
pgotel::InitMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(pgotel_timeout));

while (!got_sigterm)
{
/* Wait necessary amount of time */
WaitLatch(&MyProc->procLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
pgotel_worker_idle_time * 1L,
PG_WAIT_EXTENSION);
ResetLatch(&MyProc->procLatch);

/* Process signals */
if (got_sighup)
{
/* Process config file */
ProcessConfigFile(PGC_SIGHUP);
got_sighup = false;

/* Restart OpenTelemetry Metrics SDK */
pgotel::CleanupMetrics();
pgotel::InitMetrics(pgotel_endpoint,
std::chrono::milliseconds(pgotel_interval),
std::chrono::milliseconds(pgotel_timeout));

ereport(LOG, (errmsg("%s: processed SIGHUP", "pgotel_worker")));
}

if (got_sigterm)
{
/* Simply exit */
ereport(LOG, (errmsg("%s: processed SIGTERM", "pgotel_worker")));
break;
}

/* Update NOW() to return correct timestamp */
SetCurrentStatementStartTimestamp();

/* Show query status in pg_stat_activity */
pgstat_report_activity(STATE_RUNNING, "pgotel_worker_main");
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());

/* Execute NOTIFY requests */
int ret = SPI_execute(buf.data, false, 0);
if (ret != SPI_OK_SELECT)
elog(FATAL, "pgotel_worker: SPI_execute failed with error code %d", ret);
// elog(LOG, "pgotel_worker: executed " UINT64_FORMAT, SPI_processed);

/* Terminate transaction */
SPI_finish();
PopActiveSnapshot();

/* Notifications are sent by the transaction commit */
CommitTransactionCommand();

pgstat_report_activity(STATE_IDLE, NULL);
}

/* No problems, so clean exit */
pgotel::CleanupMetrics();
proc_exit(0);
}

static void
pgotel_register_worker()
{
BackgroundWorker worker;

/* Worker parameter and registration */
MemSet(&worker, 0, sizeof(BackgroundWorker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
snprintf(worker.bgw_library_name, BGW_MAXLEN, "pgotel");
snprintf(worker.bgw_function_name, BGW_MAXLEN, "pgotel_worker_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "%s", "PGOTEL Worker");

/* Wait 10 seconds for restart before crash */
worker.bgw_restart_time = 10;
worker.bgw_main_arg = (Datum) 0;
worker.bgw_notify_pid = 0;
RegisterBackgroundWorker(&worker);
}

/*
* _PG_fini
* Exit point unloading hooks
* _PG_init
* Entry point loading hooks
*/
void
_PG_fini(void)
_PG_init(void)
{
pgotel::CleanupMetrics();
pgotel_define_gucs();
pgotel_register_worker();
}

/* declaration out of file scope */
Expand Down