Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PostgreSQL nodes for scanning tables #477

Merged
merged 53 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
35ff478
Use PostgreSQL execution nodes to scan tables
mkaruza Nov 28, 2024
3bbeba1
Simple comment for calculation of wanted number of workers
mkaruza Dec 4, 2024
2a1d189
Don't allow partitioned table scan for now
mkaruza Dec 4, 2024
ddef887
Added GlobalProcessLatch
mkaruza Dec 6, 2024
79a7a25
Renamed DuckdbProcessLock to GlobalProcessLock
mkaruza Dec 6, 2024
d90aab5
Remove PostgresViewScan as it is not used anymore
mkaruza Dec 11, 2024
86e3206
Protect pg function with guard. Handle TEMP tables.
mkaruza Dec 13, 2024
41c9aea
Keep compiler quite for unused variables
mkaruza Dec 13, 2024
437426f
Minor header cleanup
mkaruza Dec 13, 2024
22d473a
Regression test for native postgres scan. Handle COUNT(*) in batches.
mkaruza Dec 14, 2024
9d4454f
Added small comment
mkaruza Dec 14, 2024
d6848b5
Remove last part of view replacement scan.
mkaruza Dec 14, 2024
8472d41
Small output fix
mkaruza Dec 14, 2024
e66cbbb
Forgot to include regression test in schedule
mkaruza Dec 14, 2024
e5e2a97
Rename variable names so they are not shadowed
mkaruza Dec 14, 2024
8d2151f
Don't cancel interrupts if not needed. Python fix tests.
mkaruza Dec 14, 2024
05e1e14
Changes for Postgres 14
mkaruza Dec 14, 2024
9acd30c
Remove unused attribute and instead don't name unused input argument
mkaruza Dec 16, 2024
a33662d
PR review changes. Cleanup scan, if possible, once the scan is
mkaruza Dec 17, 2024
79cc554
Allow FDW tables and paritioned tables
mkaruza Dec 18, 2024
c850250
Fix Mac build
Y-- Dec 18, 2024
fed5f37
Quote column name
Y-- Dec 18, 2024
076f47a
Vendor `RELKIND_HAS_TABLE_AM` for PG14
Y-- Dec 18, 2024
0e42b9e
Calculate number of parallel workers on scan for count_tuples_only
mkaruza Dec 19, 2024
c92901d
Fixed regression test for previous commit
mkaruza Dec 19, 2024
06c5375
Refactor logic for WaitGlobalLatch
mkaruza Dec 20, 2024
5aaa154
Simplify latch waiting logic
JelteF Dec 20, 2024
5da2802
Add comment to PostgresScopedStackReset
JelteF Dec 20, 2024
7c5ab0d
clang-format off around vendored code
JelteF Dec 20, 2024
fe6b73b
Move ConstructFullyQualifiedTableName to relations.cpp
JelteF Dec 20, 2024
1db3d34
Remove accidental lock
JelteF Dec 20, 2024
bad8e19
Move QuoteIdentifier to relations.cpp
JelteF Dec 20, 2024
534c608
Undo accidental removal
JelteF Dec 20, 2024
6767f96
Make postgres_scan.cpp c++ only
JelteF Dec 20, 2024
45c5ab4
Take GlobalProcessLock for the duration of a chunk
JelteF Dec 20, 2024
77df0e1
GlobalLock held for duration of populating output vector
mkaruza Dec 23, 2024
9eed6af
Use returned tuple slot attr information for populating output vector
mkaruza Dec 23, 2024
abb086f
Rename GUC variable according to PR suggestion
mkaruza Dec 23, 2024
aa42e61
Remove check for allowed table types
mkaruza Dec 23, 2024
60f2028
Fix typo
JelteF Jan 2, 2025
5d560df
Simplify max_workers_per_postgres_scan
mkaruza Jan 7, 2025
94f60af
Update empty comment
mkaruza Jan 7, 2025
68bb7a6
Handle cleanup correctly in case of stopped parallel workers
JelteF Jan 7, 2025
e0135e8
Fix formatting
JelteF Jan 7, 2025
0b6c8ec
Simplify PostgresTableReader cleanup logic
JelteF Jan 9, 2025
431e5fb
Add the BLOB alias for bytea, which duckdb uses
JelteF Jan 9, 2025
d69c222
Fix null deref if execution finished before cancel is processed
JelteF Jan 10, 2025
a7274dd
Return table name when stringifying TF
Y-- Jan 10, 2025
6e9a2e9
Cleanup
Y-- Jan 10, 2025
c5122db
Format
Y-- Jan 10, 2025
2883867
Update duckdb_recycle.out
Y-- Jan 10, 2025
e79a99a
Update tests
Y-- Jan 10, 2025
7908a9a
Keep `POSTGRES_SCAN` in the output
Y-- Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 10 additions & 19 deletions include/pgduckdb/catalog/pgduckdb_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,26 @@ namespace pgduckdb {

class PostgresTable : public duckdb::TableCatalogEntry {
public:
virtual ~PostgresTable();

public:
static Relation OpenRelation(Oid relid);
static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel);
static Cardinality GetTableCardinality(Relation rel);

protected:
PostgresTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info,
mkaruza marked this conversation as resolved.
Show resolved Hide resolved
Relation rel, Cardinality cardinality, Snapshot snapshot);

protected:
Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

class PostgresHeapTable : public PostgresTable {
public:
PostgresHeapTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info,
Relation rel, Cardinality cardinality, Snapshot snapshot);
virtual ~PostgresTable();

public:
// -- Table API --
duckdb::unique_ptr<duckdb::BaseStatistics> GetStatistics(duckdb::ClientContext &context,
duckdb::column_t column_id) override;
duckdb::TableFunction GetScanFunction(duckdb::ClientContext &context,
duckdb::unique_ptr<duckdb::FunctionData> &bind_data) override;
duckdb::TableStorageInfo GetStorageInfo(duckdb::ClientContext &context) override;

public:
static Relation OpenRelation(Oid relid);
static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel);

protected:
Relation rel;
Cardinality cardinality;
Snapshot snapshot;
};

} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/logger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ namespace pgduckdb {
pd_prevent_errno_in_scope(); \
static_assert(elevel >= DEBUG5 && elevel <= WARNING_CLIENT_ONLY, "Invalid error level"); \
if (message_level_is_interesting(elevel)) { \
std::lock_guard<std::mutex> lock(DuckdbProcessLock::GetLock()); \
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock()); \
if (errstart(elevel, domain)) \
__VA_ARGS__, errfinish(__FILE__, __LINE__, __func__); \
} \
Expand Down
13 changes: 13 additions & 0 deletions include/pgduckdb/pg/declarations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,17 @@ struct TableAmRoutine;
typedef uint32_t CommandId;

typedef uint32_t SubTransactionId;

struct QueryDesc;

struct ParallelExecutorInfo;

struct MinimalTupleData;
typedef MinimalTupleData *MinimalTuple;

struct TupleQueueReader;

struct PlanState;

struct Plan;
}
11 changes: 9 additions & 2 deletions include/pgduckdb/pg/relations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ const char *GetAttName(const Form_pg_attribute);

Form_pg_attribute GetAttr(const TupleDesc tupleDesc, int i);

void EstimateRelSize(Relation rel, int32_t *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac);
bool TupleIsNull(TupleTableSlot *slot);

void SlotGetAllAttrs(TupleTableSlot *slot);

double EstimateRelSize(Relation rel);

Oid GetRelidFromSchemaAndTable(const char *, const char *);

bool IsValidOid(Oid);

bool IsValidBlockNumber(BlockNumber);

bool IsRelView(Relation);
char *GenerateQualifiedRelationName(Relation rel);
const char *QuoteIdentifier(const char *ident);

const char *GetRelationName(Relation rel);

} // namespace pgduckdb
2 changes: 1 addition & 1 deletion include/pgduckdb/pgduckdb_guc.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ extern bool duckdb_enable_external_access;
extern bool duckdb_allow_unsigned_extensions;
extern bool duckdb_autoinstall_known_extensions;
extern bool duckdb_autoload_known_extensions;
extern int duckdb_max_threads_per_postgres_scan;
extern int duckdb_max_workers_per_postgres_scan;
extern char *duckdb_motherduck_postgres_database;
extern int duckdb_motherduck_enabled;
extern char *duckdb_motherduck_token;
Expand Down
4 changes: 2 additions & 2 deletions include/pgduckdb/pgduckdb_process_lock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
namespace pgduckdb {

/*
* DuckdbProcessLock is used to synchronize calls to PG functions that modify global variables. Examples
* GlobalProcessLock is used to synchronize calls to PG functions that modify global variables. Examples
* for this synchronization are functions that read buffers/etc. This lock is shared between all threads and all
* replacement scans.
*/
struct DuckdbProcessLock {
struct GlobalProcessLock {
public:
static std::mutex &
GetLock() {
Expand Down
7 changes: 3 additions & 4 deletions include/pgduckdb/pgduckdb_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

namespace pgduckdb {

class PostgresScanGlobalState;
class PostgresScanLocalState;
struct PostgresScanGlobalState;
struct PostgresScanLocalState;

// DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000
constexpr int32_t PGDUCKDB_DUCK_DATE_OFFSET = 10957;
Expand All @@ -21,7 +21,6 @@ int32_t GetPostgresDuckDBTypemod(const duckdb::LogicalType &type);
duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type);
void ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, uint64_t offset);
bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, uint64_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr<PostgresScanGlobalState> scan_global_state,
duckdb::shared_ptr<PostgresScanLocalState> scan_local_state, HeapTupleData *tuple);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot);

} // namespace pgduckdb
32 changes: 31 additions & 1 deletion include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ struct ErrorContextCallback;
struct MemoryContextData;

typedef struct MemoryContextData *MemoryContext;
typedef char *pg_stack_base_t;

extern sigjmp_buf *PG_exception_stack;
extern MemoryContext CurrentMemoryContext;
extern ErrorContextCallback *error_context_stack;
extern ErrorData *CopyErrorData();
extern void FlushErrorState();
extern pg_stack_base_t set_stack_base();
extern void restore_stack_base(pg_stack_base_t base);
}

namespace pgduckdb {
Expand All @@ -45,6 +48,33 @@ struct PgExceptionGuard {
ErrorContextCallback *_save_context_stack;
};

/*
* PostgresScopedStackReset is a RAII class that saves the current stack base
* and restores it on destruction. When calling certain Postgres C functions
* from other threads than the main thread this is necessary to avoid Postgres
* throwing an error running out of stack space. In codepaths that postgres
* expects to be called recursively it checks if the stack size is still within
* the limit set by max_stack_depth. It does so by comparing the current stack
* pointer to the pointer it saved when starting the process. But since
* different threads have different stacks, this check will fail basically
* automatically if the thread is not the main thread. This class is a
* workaround for this problem, by configuring a new stack base matching the
* current location of the stack. This does mean that the stack might grow
* higher than, but for our use case this shouldn't matter anyway because we
* don't expect any recursive functions to be called. And even if we did expect
* that, the default max_stack_depth is conservative enough to handle this small
* bit of extra stack space.
*/
struct PostgresScopedStackReset {
PostgresScopedStackReset() {
saved_current_stack = set_stack_base();
}
~PostgresScopedStackReset() {
restore_stack_base(saved_current_stack);
}
pg_stack_base_t saved_current_stack;
};

/*
* DuckdbGlobalLock should be held before calling.
*/
Expand Down Expand Up @@ -72,7 +102,7 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) {
}

#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, __VA_ARGS__)
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, ##__VA_ARGS__)

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query);

Expand Down
60 changes: 0 additions & 60 deletions include/pgduckdb/scan/heap_reader.hpp

This file was deleted.

96 changes: 61 additions & 35 deletions include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,52 +5,78 @@
#include "pgduckdb/pg/declarations.hpp"
#include "pgduckdb/utility/allocator.hpp"

#include "pgduckdb/scan/postgres_table_reader.hpp"

#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include.

namespace pgduckdb {

class PostgresScanGlobalState {
public:
PostgresScanGlobalState() : m_snapshot(nullptr), m_count_tuples_only(false), m_total_row_count(0) {
// Global State

struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresScanGlobalState(Snapshot, Relation rel, duckdb::TableFunctionInitInput &input);
~PostgresScanGlobalState();
idx_t
MaxThreads() const override {
return 1;
}
void ConstructTableScanQuery(duckdb::TableFunctionInitInput &input);

void InitGlobalState(duckdb::TableFunctionInitInput &input);

void InitRelationMissingAttrs(TupleDesc tuple_desc);

Snapshot m_snapshot;
TupleDesc m_tuple_desc;
std::mutex m_lock; // Lock for one replacement scan
bool m_count_tuples_only;
/* Postgres column id to duckdb scanned index. The scanned index is DuckDB
* its scan order of the columns. */
std::vector<duckdb::pair<AttrNumber, duckdb::idx_t>> m_columns_to_scan;
/* These are indexed by the DuckDB scan index */
std::vector<duckdb::TableFilter *> m_column_filters;
/* Duckdb output vector idx with information about postgres column id */
duckdb::vector<duckdb::pair<duckdb::idx_t, AttrNumber>> m_output_columns;
std::atomic<std::uint32_t> m_total_row_count;
duckdb::map<int, Datum> m_relation_missing_attrs;
public:
Snapshot snapshot;
Relation rel;
TupleDesc table_tuple_desc;
bool count_tuples_only;
duckdb::vector<AttrNumber> output_columns;
std::atomic<std::uint32_t> total_row_count;
std::ostringstream scan_query;
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
};

class PostgresScanLocalState {
// Local State

struct PostgresScanLocalState : public duckdb::LocalTableFunctionState {
public:
PostgresScanLocalState(const PostgresScanGlobalState *psgs) : m_output_vector_size(0), m_exhausted_scan(false) {
if (!psgs->m_count_tuples_only) {
const auto s = psgs->m_columns_to_scan.size();
values.resize(s);
nulls.resize(s);
}
}
PostgresScanLocalState(PostgresScanGlobalState *global_state);
~PostgresScanLocalState() override;

public:
mkaruza marked this conversation as resolved.
Show resolved Hide resolved
PostgresScanGlobalState *global_state;
size_t output_vector_size;
bool exhausted_scan;
};

uint32_t m_output_vector_size;
bool m_exhausted_scan;
std::vector<Datum, DuckDBMallocator<Datum>> values;
std::vector<bool, DuckDBMallocator<bool>> nulls;
// PostgresScanFunctionData

struct PostgresScanFunctionData : public duckdb::TableFunctionData {
public:
PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot);
~PostgresScanFunctionData() override;

public:
duckdb::vector<duckdb::string> complex_filters;
Relation rel;
uint64_t cardinality;
Snapshot snapshot;
};

duckdb::unique_ptr<duckdb::TableRef> PostgresReplacementScan(duckdb::ClientContext &context,
duckdb::ReplacementScanInput &input,
duckdb::optional_ptr<duckdb::ReplacementScanData> data);
// PostgresScanTableFunction

struct PostgresScanTableFunction : public duckdb::TableFunction {
public:
PostgresScanTableFunction();

public:
static duckdb::unique_ptr<duckdb::GlobalTableFunctionState>
PostgresScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input);
static duckdb::unique_ptr<duckdb::LocalTableFunctionState>
PostgresScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input,
duckdb::GlobalTableFunctionState *gstate);
static void PostgresScanFunction(duckdb::ClientContext &context, duckdb::TableFunctionInput &data,
duckdb::DataChunk &output);
static duckdb::unique_ptr<duckdb::NodeStatistics> PostgresScanCardinality(duckdb::ClientContext &context,
const duckdb::FunctionData *data);
static std::string ToString(const duckdb::FunctionData *bind_data);
};

} // namespace pgduckdb
Loading
Loading