Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start one background worker on first DuckDB query #544

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/pgduckdb/pgduckdb_background_worker.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#pragma once

void DuckdbInitBackgroundWorker(void);
void StartBackgroundWorkerIfNeeded(void);

namespace pgduckdb {

Expand Down
1 change: 0 additions & 1 deletion src/pgduckdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ _PG_init(void) {
DuckdbInitGUC();
DuckdbInitHooks();
DuckdbInitNode();
DuckdbInitBackgroundWorker();
pgduckdb::RegisterDuckdbXactCallback();
}
} // extern "C"
Expand Down
90 changes: 86 additions & 4 deletions src/pgduckdb_background_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "pgduckdb/utility/cpp_wrapper.hpp"
#include <string>
#include <unordered_map>
#include <sys/file.h>
#include <fcntl.h>

extern "C" {
#include "postgres.h"
Expand All @@ -25,6 +27,7 @@ extern "C" {
#include "miscadmin.h"
#include "pgstat.h"
#include "executor/spi.h"
#include "common/file_utils.h"
#include "postmaster/bgworker.h"
#include "postmaster/interrupt.h"
#include "storage/ipc.h"
Expand Down Expand Up @@ -57,10 +60,17 @@ static bool is_background_worker = false;
static std::unordered_map<std::string, std::string> last_known_motherduck_catalog_versions;
static uint64 initial_cache_version = 0;

bool CanTakeLockForDatabase(Oid database_oid);

extern "C" {

PGDLLEXPORT void
pgduckdb_background_worker_main(Datum /* main_arg */) {
elog(LOG, "started pg_duckdb background worker");
if (!CanTakeLockForDatabase(0)) {
elog(LOG, "pg_duckdb background worker: could not take lock for database '%u'. Will exit.", 0);
return;
}
// Set up a signal handler for SIGTERM
pqsignal(SIGTERM, die);
BackgroundWorkerUnblockSignals();
Expand Down Expand Up @@ -124,9 +134,81 @@ force_motherduck_sync(PG_FUNCTION_ARGS) {
}
}

constexpr const char *PGDUCKDB_SYNC_WORKER_NAME = "pg_duckdb sync worker";

bool
HasBgwRunningForMyDatabase() {
const auto num_backends = pgstat_fetch_stat_numbackends();
for (int backend_idx = 1; backend_idx <= num_backends; ++backend_idx) {
#if PG_VERSION_NUM >= 140000 && PG_VERSION_NUM < 160000
PgBackendStatus *beentry = pgstat_fetch_stat_beentry(backend_idx);
#else
LocalPgBackendStatus *local_beentry = pgstat_get_local_beentry_by_index(backend_idx);
PgBackendStatus *beentry = &local_beentry->backendStatus;
#endif
if (beentry->st_databaseid == InvalidOid) {
continue; // backend is not connected to a database
}

auto datid = ObjectIdGetDatum(beentry->st_databaseid);
if (datid != MyDatabaseId) {
continue; // backend is connected to a different database
}

auto backend_type = GetBackgroundWorkerTypeByPid(beentry->st_procpid);
if (!backend_type || strcmp(backend_type, PGDUCKDB_SYNC_WORKER_NAME) != 0) {
continue; // backend is not a pg_duckdb sync worker
}

return true;
}

return false;
}

/*
Attempts to take a lock on a file named 'pgduckdb_worker_<database_oid>.lock'
If the lock is taken, the function returns true. If the lock is not taken, the function returns false.
*/
bool
CanTakeLockForDatabase(Oid database_oid) {
char lock_file_name[MAXPGPATH];
snprintf(lock_file_name, MAXPGPATH, "%s/%s.pgduckdb_worker.%d", DataDir, PG_TEMP_FILE_PREFIX, database_oid);

auto fd = open(lock_file_name, O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (fd < 0) {
auto err = strerror(errno);
elog(ERROR, "Could not take lock on file '%s': %s", lock_file_name, err);
}

// Take exclusive lock on the file
auto ret = flock(fd, LOCK_EX | LOCK_NB);
if (ret == EWOULDBLOCK) {
return false;
}

if (ret != 0) {
auto err = strerror(errno);
elog(ERROR, "Could not take lock on file '%s': %s", lock_file_name, err);
}

return true;
}

/*
Will start the background worker if:
- MotherDuck is enabled (TODO: should be database-specific)
- it is not already running for the current PG database
*/
void
DuckdbInitBackgroundWorker(void) {
StartBackgroundWorkerIfNeeded(void) {
if (!pgduckdb::IsMotherDuckEnabledAnywhere()) {
elog(DEBUG3, "pg_duckdb background worker not started because MotherDuck is not enabled");
return;
}

if (HasBgwRunningForMyDatabase()) {
elog(DEBUG3, "pg_duckdb background worker already running for database %u", MyDatabaseId);
return;
}

Expand All @@ -137,12 +219,11 @@ DuckdbInitBackgroundWorker(void) {
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
snprintf(worker.bgw_library_name, BGW_MAXLEN, "pg_duckdb");
snprintf(worker.bgw_function_name, BGW_MAXLEN, "pgduckdb_background_worker_main");
snprintf(worker.bgw_name, BGW_MAXLEN, "pg_duckdb sync worker");
snprintf(worker.bgw_name, BGW_MAXLEN, PGDUCKDB_SYNC_WORKER_NAME);
worker.bgw_restart_time = 1;
worker.bgw_main_arg = (Datum)0;

// Register the worker
RegisterBackgroundWorker(&worker);
RegisterDynamicBackgroundWorker(&worker, NULL);
}

namespace pgduckdb {
Expand Down Expand Up @@ -603,6 +684,7 @@ SyncMotherDuckCatalogsWithPg_Cpp(bool drop_with_cascade) {
if (current_motherduck_catalog_version) {
pfree(current_motherduck_catalog_version);
}

current_motherduck_catalog_version = pstrdup(catalog_version.c_str());
MemoryContextSwitchTo(old_context);

Expand Down
6 changes: 6 additions & 0 deletions src/pgduckdb_metadata_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "pgduckdb/pgduckdb.h"
#include "pgduckdb/vendor/pg_list.hpp"
#include "pgduckdb/pgduckdb_metadata_cache.hpp"
#include "pgduckdb/pgduckdb_background_worker.hpp"
#include "pgduckdb/pgduckdb_guc.h"

namespace pgduckdb {
Expand Down Expand Up @@ -79,9 +80,11 @@ InvalidateCaches(Datum /*arg*/, int /*cache_id*/, uint32 hash_value) {
if (hash_value != schema_hash_value) {
return;
}

if (!cache.valid) {
return;
}

cache.valid = false;
if (cache.installed) {
list_free(cache.duckdb_only_functions);
Expand Down Expand Up @@ -172,6 +175,9 @@ IsExtensionRegistered() {
if (cache.installed) {
/* If the extension is installed we can build the rest of the cache */
BuildDuckdbOnlyFunctions();

StartBackgroundWorkerIfNeeded();

cache.table_am_oid = GetSysCacheOid1(AMNAME, Anum_pg_am_oid, CStringGetDatum("duckdb"));

cache.motherduck_postgres_database_oid = get_database_oid(duckdb_motherduck_postgres_database, false);
Expand Down