Skip to content

Commit

Permalink
Use PostgreSQL nodes for scanning tables (duckdb#477)
Browse files Browse the repository at this point in the history
Idea is to reconstruct query based on duckdb filtering information, for
each table and use that information to plan postgres execution. This
plan will, potentially exeucute with parallel workers. If no workers are
available we will scan this local to thread.

This approach has advantage that it also will support all other scan
nodes that are available (and postgresql thinks are best - index/index
only/bitmap scan, also partitioned tables should be possible)

Fixes duckdb#243

---------

Co-authored-by: Yves <[email protected]>
Co-authored-by: Jelte Fennema-Nio <[email protected]>
  • Loading branch information
3 people authored and ritwizsinha committed Jan 11, 2025
1 parent 7cde7f6 commit 8743bb4
Show file tree
Hide file tree
Showing 34 changed files with 1,099 additions and 858 deletions.
29 changes: 10 additions & 19 deletions include/pgduckdb/catalog/pgduckdb_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,26 @@ namespace pgduckdb {

class PostgresTable : public duckdb::TableCatalogEntry {
public:
virtual ~PostgresTable();

public:
static Relation OpenRelation(Oid relid);
static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel);
static Cardinality GetTableCardinality(Relation rel);

protected:
PostgresTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info,
Relation rel, Cardinality cardinality, Snapshot snapshot);

protected:
Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

class PostgresHeapTable : public PostgresTable {
public:
PostgresHeapTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info,
Relation rel, Cardinality cardinality, Snapshot snapshot);
virtual ~PostgresTable();

public:
// -- Table API --
duckdb::unique_ptr<duckdb::BaseStatistics> GetStatistics(duckdb::ClientContext &context,
duckdb::column_t column_id) override;
duckdb::TableFunction GetScanFunction(duckdb::ClientContext &context,
duckdb::unique_ptr<duckdb::FunctionData> &bind_data) override;
duckdb::TableStorageInfo GetStorageInfo(duckdb::ClientContext &context) override;

public:
static Relation OpenRelation(Oid relid);
static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel);

protected:
Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace pgduckdb {
pd_prevent_errno_in_scope(); \
static_assert(elevel >= DEBUG5 && elevel <= WARNING_CLIENT_ONLY, "Invalid error level"); \
if (message_level_is_interesting(elevel)) { \
std::lock_guard<std::mutex> lock(DuckdbProcessLock::GetLock()); \
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock()); \
if (errstart(elevel, domain)) \
__VA_ARGS__, errfinish(__FILE__, __LINE__, __func__); \
} \
Expand Down
13 changes: 13 additions & 0 deletions include/pgduckdb/pg/declarations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,17 @@ struct TableAmRoutine;
typedef uint32_t CommandId;

typedef uint32_t SubTransactionId;

struct QueryDesc;

struct ParallelExecutorInfo;

struct MinimalTupleData;
typedef MinimalTupleData *MinimalTuple;

struct TupleQueueReader;

struct PlanState;

struct Plan;
}
11 changes: 9 additions & 2 deletions include/pgduckdb/pg/relations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ const char *GetAttName(const Form_pg_attribute);

Form_pg_attribute GetAttr(const TupleDesc tupleDesc, int i);

void EstimateRelSize(Relation rel, int32_t *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac);
bool TupleIsNull(TupleTableSlot *slot);

void SlotGetAllAttrs(TupleTableSlot *slot);

double EstimateRelSize(Relation rel);

Oid GetRelidFromSchemaAndTable(const char *, const char *);

bool IsValidOid(Oid);

bool IsValidBlockNumber(BlockNumber);

bool IsRelView(Relation);
char *GenerateQualifiedRelationName(Relation rel);
const char *QuoteIdentifier(const char *ident);

const char *GetRelationName(Relation rel);

} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/pgduckdb_guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ extern bool duckdb_enable_external_access;
extern bool duckdb_allow_unsigned_extensions;
extern bool duckdb_autoinstall_known_extensions;
extern bool duckdb_autoload_known_extensions;
extern int duckdb_max_threads_per_postgres_scan;
extern int duckdb_max_workers_per_postgres_scan;
extern char *duckdb_motherduck_postgres_database;
extern int duckdb_motherduck_enabled;
extern char *duckdb_motherduck_token;
Expand Down
4 changes: 2 additions & 2 deletions include/pgduckdb/pgduckdb_process_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace pgduckdb {

/*
* DuckdbProcessLock is used to synchronize calls to PG functions that modify global variables. Examples
* GlobalProcessLock is used to synchronize calls to PG functions that modify global variables. Examples
* for this synchronization are functions that read buffers/etc. This lock is shared between all threads and all
* replacement scans.
*/
struct DuckdbProcessLock {
struct GlobalProcessLock {
public:
static std::mutex &
GetLock() {
Expand Down
7 changes: 3 additions & 4 deletions include/pgduckdb/pgduckdb_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

namespace pgduckdb {

class PostgresScanGlobalState;
class PostgresScanLocalState;
struct PostgresScanGlobalState;
struct PostgresScanLocalState;

// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000
constexpr int32_t PGDUCKDB_DUCK_DATE_OFFSET = 10957;
Expand All @@ -21,7 +21,6 @@ int32_t GetPostgresDuckDBTypemod(const duckdb::LogicalType &type);
duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type);
void ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, uint64_t offset);
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, uint64_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, HeapTupleData *tuple);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot);

} // namespace pgduckdb
32 changes: 31 additions & 1 deletion include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ struct ErrorContextCallback;
struct MemoryContextData;

typedef struct MemoryContextData *MemoryContext;
typedef char *pg_stack_base_t;

extern sigjmp_buf *PG_exception_stack;
extern MemoryContext CurrentMemoryContext;
extern ErrorContextCallback *error_context_stack;
extern ErrorData *CopyErrorData();
extern void FlushErrorState();
extern pg_stack_base_t set_stack_base();
extern void restore_stack_base(pg_stack_base_t base);
}

namespace pgduckdb {
Expand All @@ -45,6 +48,33 @@ struct PgExceptionGuard {
ErrorContextCallback *_save_context_stack;
};

/*
* PostgresScopedStackReset is a RAII class that saves the current stack base
* and restores it on destruction. When calling certain Postgres C functions
* from other threads than the main thread this is necessary to avoid Postgres
* throwing an error running out of stack space. In codepaths that postgres
* expects to be called recursively it checks if the stack size is still within
* the limit set by max_stack_depth. It does so by comparing the current stack
* pointer to the pointer it saved when starting the process. But since
* different threads have different stacks, this check will fail basically
* automatically if the thread is not the main thread. This class is a
* workaround for this problem, by configuring a new stack base matching the
* current location of the stack. This does mean that the stack might grow
* higher than, but for our use case this shouldn't matter anyway because we
* don't expect any recursive functions to be called. And even if we did expect
* that, the default max_stack_depth is conservative enough to handle this small
* bit of extra stack space.
*/
struct PostgresScopedStackReset {
PostgresScopedStackReset() {
saved_current_stack = set_stack_base();
}
~PostgresScopedStackReset() {
restore_stack_base(saved_current_stack);
}
pg_stack_base_t saved_current_stack;
};

/*
* DuckdbGlobalLock should be held before calling.
*/
Expand Down Expand Up @@ -72,7 +102,7 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) {
}

#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, __VA_ARGS__)
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, ##__VA_ARGS__)

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query);

Expand Down
60 changes: 0 additions & 60 deletions include/pgduckdb/scan/heap_reader.hpp

This file was deleted.

96 changes: 61 additions & 35 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,78 @@
#include "pgduckdb/pg/declarations.hpp"
#include "pgduckdb/utility/allocator.hpp"

#include "pgduckdb/scan/postgres_table_reader.hpp"

#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include.

namespace pgduckdb {

class PostgresScanGlobalState {
public:
PostgresScanGlobalState() : m_snapshot(nullptr), m_count_tuples_only(false), m_total_row_count(0) {
// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState(Snapshot, Relation rel, duckdb::TableFunctionInitInput &input);
~PostgresScanGlobalState();
idx_t
MaxThreads() const override {
return 1;
}
void ConstructTableScanQuery(duckdb::TableFunctionInitInput &input);

void InitGlobalState(duckdb::TableFunctionInitInput &input);

void InitRelationMissingAttrs(TupleDesc tuple_desc);

Snapshot m_snapshot;
TupleDesc m_tuple_desc;
std::mutex m_lock; // Lock for one replacement scan
bool m_count_tuples_only;
/* Postgres column id to duckdb scanned index. The scanned index is DuckDB
* its scan order of the columns. */
std::vector<duckdb::pair<AttrNumber, duckdb::idx_t>> m_columns_to_scan;
/* These are indexed by the DuckDB scan index */
std::vector<duckdb::TableFilter *> m_column_filters;
/* Duckdb output vector idx with information about postgres column id */
duckdb::vector<duckdb::pair<duckdb::idx_t, AttrNumber>> m_output_columns;
std::atomic<std::uint32_t> m_total_row_count;
duckdb::map<int, Datum> m_relation_missing_attrs;
public:
Snapshot snapshot;
Relation rel;
TupleDesc table_tuple_desc;
bool count_tuples_only;
duckdb::vector<AttrNumber> output_columns;
std::atomic<std::uint32_t> total_row_count;
std::ostringstream scan_query;
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
};

class PostgresScanLocalState {
// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresScanLocalState(const PostgresScanGlobalState *psgs) : m_output_vector_size(0), m_exhausted_scan(false) {
if (!psgs->m_count_tuples_only) {
const auto s = psgs->m_columns_to_scan.size();
values.resize(s);
nulls.resize(s);
}
}
PostgresScanLocalState(PostgresScanGlobalState *global_state);
~PostgresScanLocalState() override;

public:
PostgresScanGlobalState *global_state;
size_t output_vector_size;
bool exhausted_scan;
};

uint32_t m_output_vector_size;
bool m_exhausted_scan;
std::vector<Datum, DuckDBMallocator<Datum>> values;
std::vector<bool, DuckDBMallocator<bool>> nulls;
// PostgresScanFunctionData

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot);
~PostgresScanFunctionData() override;

public:
duckdb::vector<duckdb::string> complex_filters;
Relation rel;
uint64_t cardinality;
Snapshot snapshot;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresReplacementScan(duckdb::ClientContext &context,
duckdb::ReplacementScanInput &input,
duckdb::optional_ptr<duckdb::ReplacementScanData> data);
// PostgresScanTableFunction

struct PostgresScanTableFunction : public duckdb::TableFunction {
public:
PostgresScanTableFunction();

public:
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
static void PostgresScanFunction(duckdb::ClientContext &context, duckdb::TableFunctionInput &data,
duckdb::DataChunk &output);
static duckdb::unique_ptr<duckdb::NodeStatistics> PostgresScanCardinality(duckdb::ClientContext &context,
const duckdb::FunctionData *data);
static std::string ToString(const duckdb::FunctionData *bind_data);
};

} // namespace pgduckdb
Loading

0 comments on commit 8743bb4

Please sign in to comment.