diff --git a/include/pgduckdb/catalog/pgduckdb_table.hpp b/include/pgduckdb/catalog/pgduckdb_table.hpp index b5d1b186..521f559e 100644 --- a/include/pgduckdb/catalog/pgduckdb_table.hpp +++ b/include/pgduckdb/catalog/pgduckdb_table.hpp @@ -11,35 +11,26 @@ namespace pgduckdb { class PostgresTable : public duckdb::TableCatalogEntry { public: - virtual ~PostgresTable(); - -public: - static Relation OpenRelation(Oid relid); - static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel); - static Cardinality GetTableCardinality(Relation rel); - -protected: PostgresTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info, Relation rel, Cardinality cardinality, Snapshot snapshot); -protected: - Relation rel; - Cardinality cardinality; - Snapshot snapshot; -}; - -class PostgresHeapTable : public PostgresTable { -public: - PostgresHeapTable(duckdb::Catalog &catalog, duckdb::SchemaCatalogEntry &schema, duckdb::CreateTableInfo &info, - Relation rel, Cardinality cardinality, Snapshot snapshot); + virtual ~PostgresTable(); public: - // -- Table API -- duckdb::unique_ptr GetStatistics(duckdb::ClientContext &context, duckdb::column_t column_id) override; duckdb::TableFunction GetScanFunction(duckdb::ClientContext &context, duckdb::unique_ptr &bind_data) override; duckdb::TableStorageInfo GetStorageInfo(duckdb::ClientContext &context) override; + +public: + static Relation OpenRelation(Oid relid); + static void SetTableInfo(duckdb::CreateTableInfo &info, Relation rel); + +protected: + Relation rel; + Cardinality cardinality; + Snapshot snapshot; }; } // namespace pgduckdb diff --git a/include/pgduckdb/logger.hpp b/include/pgduckdb/logger.hpp index 3ce6c5e0..67ae9ce8 100644 --- a/include/pgduckdb/logger.hpp +++ b/include/pgduckdb/logger.hpp @@ -46,7 +46,7 @@ namespace pgduckdb { pd_prevent_errno_in_scope(); \ static_assert(elevel >= DEBUG5 && elevel <= WARNING_CLIENT_ONLY, "Invalid error level"); \ if (message_level_is_interesting(elevel)) { \ - std::lock_guard lock(DuckdbProcessLock::GetLock()); \ + std::lock_guard lock(GlobalProcessLock::GetLock()); \ if (errstart(elevel, domain)) \ __VA_ARGS__, errfinish(__FILE__, __LINE__, __func__); \ } \ diff --git a/include/pgduckdb/pg/declarations.hpp b/include/pgduckdb/pg/declarations.hpp index fb7a4043..46d1a6ee 100644 --- a/include/pgduckdb/pg/declarations.hpp +++ b/include/pgduckdb/pg/declarations.hpp @@ -66,4 +66,17 @@ struct TableAmRoutine; typedef uint32_t CommandId; typedef uint32_t SubTransactionId; + +struct QueryDesc; + +struct ParallelExecutorInfo; + +struct MinimalTupleData; +typedef MinimalTupleData *MinimalTuple; + +struct TupleQueueReader; + +struct PlanState; + +struct Plan; } diff --git a/include/pgduckdb/pg/relations.hpp b/include/pgduckdb/pg/relations.hpp index e4fd62a7..8a81fbc9 100644 --- a/include/pgduckdb/pg/relations.hpp +++ b/include/pgduckdb/pg/relations.hpp @@ -18,7 +18,11 @@ const char *GetAttName(const Form_pg_attribute); Form_pg_attribute GetAttr(const TupleDesc tupleDesc, int i); -void EstimateRelSize(Relation rel, int32_t *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac); +bool TupleIsNull(TupleTableSlot *slot); + +void SlotGetAllAttrs(TupleTableSlot *slot); + +double EstimateRelSize(Relation rel); Oid GetRelidFromSchemaAndTable(const char *, const char *); @@ -26,6 +30,9 @@ bool IsValidOid(Oid); bool IsValidBlockNumber(BlockNumber); -bool IsRelView(Relation); +char *GenerateQualifiedRelationName(Relation rel); +const char *QuoteIdentifier(const char *ident); + +const char *GetRelationName(Relation rel); } // namespace pgduckdb diff --git a/include/pgduckdb/pgduckdb_guc.h b/include/pgduckdb/pgduckdb_guc.h index b1e0664d..629de5ac 100644 --- a/include/pgduckdb/pgduckdb_guc.h +++ b/include/pgduckdb/pgduckdb_guc.h @@ -8,7 +8,7 @@ extern bool duckdb_enable_external_access; extern bool duckdb_allow_unsigned_extensions; extern bool duckdb_autoinstall_known_extensions; extern bool duckdb_autoload_known_extensions; -extern int duckdb_max_threads_per_postgres_scan; +extern int duckdb_max_workers_per_postgres_scan; extern char *duckdb_motherduck_postgres_database; extern int duckdb_motherduck_enabled; extern char *duckdb_motherduck_token; diff --git a/include/pgduckdb/pgduckdb_process_lock.hpp b/include/pgduckdb/pgduckdb_process_lock.hpp index 45947c98..dff227fe 100644 --- a/include/pgduckdb/pgduckdb_process_lock.hpp +++ b/include/pgduckdb/pgduckdb_process_lock.hpp @@ -5,11 +5,11 @@ namespace pgduckdb { /* - * DuckdbProcessLock is used to synchronize calls to PG functions that modify global variables. Examples + * GlobalProcessLock is used to synchronize calls to PG functions that modify global variables. Examples * for this synchronization are functions that read buffers/etc. This lock is shared between all threads and all * replacement scans. */ -struct DuckdbProcessLock { +struct GlobalProcessLock { public: static std::mutex & GetLock() { diff --git a/include/pgduckdb/pgduckdb_types.hpp b/include/pgduckdb/pgduckdb_types.hpp index 97028a46..ebc7622d 100644 --- a/include/pgduckdb/pgduckdb_types.hpp +++ b/include/pgduckdb/pgduckdb_types.hpp @@ -7,8 +7,8 @@ namespace pgduckdb { -class PostgresScanGlobalState; -class PostgresScanLocalState; +struct PostgresScanGlobalState; +struct PostgresScanLocalState; // DuckDB has date starting from 1/1/1970 while PG starts from 1/1/2000 constexpr int32_t PGDUCKDB_DUCK_DATE_OFFSET = 10957; @@ -21,7 +21,6 @@ int32_t GetPostgresDuckDBTypemod(const duckdb::LogicalType &type); duckdb::Value ConvertPostgresParameterToDuckValue(Datum value, Oid postgres_type); void ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, uint64_t offset); bool ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, uint64_t col); -void InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr scan_global_state, - duckdb::shared_ptr scan_local_state, HeapTupleData *tuple); +void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot); } // namespace pgduckdb diff --git a/include/pgduckdb/pgduckdb_utils.hpp b/include/pgduckdb/pgduckdb_utils.hpp index 9787707e..dfe74c1a 100644 --- a/include/pgduckdb/pgduckdb_utils.hpp +++ b/include/pgduckdb/pgduckdb_utils.hpp @@ -17,12 +17,15 @@ struct ErrorContextCallback; struct MemoryContextData; typedef struct MemoryContextData *MemoryContext; +typedef char *pg_stack_base_t; extern sigjmp_buf *PG_exception_stack; extern MemoryContext CurrentMemoryContext; extern ErrorContextCallback *error_context_stack; extern ErrorData *CopyErrorData(); extern void FlushErrorState(); +extern pg_stack_base_t set_stack_base(); +extern void restore_stack_base(pg_stack_base_t base); } namespace pgduckdb { @@ -45,6 +48,33 @@ struct PgExceptionGuard { ErrorContextCallback *_save_context_stack; }; +/* + * PostgresScopedStackReset is a RAII class that saves the current stack base + * and restores it on destruction. When calling certain Postgres C functions + * from other threads than the main thread this is necessary to avoid Postgres + * throwing an error running out of stack space. In codepaths that postgres + * expects to be called recursively it checks if the stack size is still within + * the limit set by max_stack_depth. It does so by comparing the current stack + * pointer to the pointer it saved when starting the process. But since + * different threads have different stacks, this check will fail basically + * automatically if the thread is not the main thread. This class is a + * workaround for this problem, by configuring a new stack base matching the + * current location of the stack. This does mean that the stack might grow + * higher than, but for our use case this shouldn't matter anyway because we + * don't expect any recursive functions to be called. And even if we did expect + * that, the default max_stack_depth is conservative enough to handle this small + * bit of extra stack space. + */ +struct PostgresScopedStackReset { + PostgresScopedStackReset() { + saved_current_stack = set_stack_base(); + } + ~PostgresScopedStackReset() { + restore_stack_base(saved_current_stack); + } + pg_stack_base_t saved_current_stack; +}; + /* * DuckdbGlobalLock should be held before calling. */ @@ -72,7 +102,7 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) { } #define PostgresFunctionGuard(FUNC, ...) \ - pgduckdb::__PostgresFunctionGuard__(__func__, __VA_ARGS__) + pgduckdb::__PostgresFunctionGuard__(__func__, ##__VA_ARGS__) duckdb::unique_ptr DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query); diff --git a/include/pgduckdb/scan/heap_reader.hpp b/include/pgduckdb/scan/heap_reader.hpp deleted file mode 100644 index a077dd13..00000000 --- a/include/pgduckdb/scan/heap_reader.hpp +++ /dev/null @@ -1,60 +0,0 @@ -#pragma once - -#include "duckdb.hpp" - -#include "pgduckdb/scan/postgres_scan.hpp" -#include "pgduckdb/pg/declarations.hpp" - -#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. - -namespace pgduckdb { - -// HeapReaderGlobalState - -class HeapReaderGlobalState { -public: - HeapReaderGlobalState(Relation rel); - BlockNumber AssignNextBlockNumber(std::mutex &lock); - -private: - BlockNumber m_nblocks; - BlockNumber m_last_assigned_block_number; -}; - -// HeapReader - -class HeapReader { -public: - HeapReader(Relation rel, duckdb::shared_ptr heap_reader_global_state, - duckdb::shared_ptr global_state, - duckdb::shared_ptr local_state); - ~HeapReader(); - HeapReader(const HeapReader &other) = delete; - HeapReader &operator=(const HeapReader &other) = delete; - HeapReader &operator=(HeapReader &&other) = delete; - HeapReader(HeapReader &&other) = delete; - bool ReadPageTuples(duckdb::DataChunk &output); - BlockNumber - GetCurrentBlockNumber() { - return m_block_number; - } - -private: - Page PreparePageRead(); - - duckdb::shared_ptr m_global_state; - duckdb::shared_ptr m_heap_reader_global_state; - duckdb::shared_ptr m_local_state; - Relation m_rel; - bool m_inited; - bool m_read_next_page; - bool m_page_tuples_all_visible; - BlockNumber m_block_number; - Buffer m_buffer; - OffsetNumber m_current_tuple_index; - int m_page_tuples_left; - duckdb::unique_ptr m_tuple; - BufferAccessStrategy m_buffer_access_strategy; -}; - -} // namespace pgduckdb diff --git a/include/pgduckdb/scan/postgres_scan.hpp b/include/pgduckdb/scan/postgres_scan.hpp index 85fdf86e..3f4d0171 100644 --- a/include/pgduckdb/scan/postgres_scan.hpp +++ b/include/pgduckdb/scan/postgres_scan.hpp @@ -5,52 +5,78 @@ #include "pgduckdb/pg/declarations.hpp" #include "pgduckdb/utility/allocator.hpp" +#include "pgduckdb/scan/postgres_table_reader.hpp" + #include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. namespace pgduckdb { -class PostgresScanGlobalState { -public: - PostgresScanGlobalState() : m_snapshot(nullptr), m_count_tuples_only(false), m_total_row_count(0) { +// Global State + +struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState { + explicit PostgresScanGlobalState(Snapshot, Relation rel, duckdb::TableFunctionInitInput &input); + ~PostgresScanGlobalState(); + idx_t + MaxThreads() const override { + return 1; } + void ConstructTableScanQuery(duckdb::TableFunctionInitInput &input); - void InitGlobalState(duckdb::TableFunctionInitInput &input); - - void InitRelationMissingAttrs(TupleDesc tuple_desc); - - Snapshot m_snapshot; - TupleDesc m_tuple_desc; - std::mutex m_lock; // Lock for one replacement scan - bool m_count_tuples_only; - /* Postgres column id to duckdb scanned index. The scanned index is DuckDB - * its scan order of the columns. */ - std::vector> m_columns_to_scan; - /* These are indexed by the DuckDB scan index */ - std::vector m_column_filters; - /* Duckdb output vector idx with information about postgres column id */ - duckdb::vector> m_output_columns; - std::atomic m_total_row_count; - duckdb::map m_relation_missing_attrs; +public: + Snapshot snapshot; + Relation rel; + TupleDesc table_tuple_desc; + bool count_tuples_only; + duckdb::vector output_columns; + std::atomic total_row_count; + std::ostringstream scan_query; + duckdb::shared_ptr table_reader_global_state; }; -class PostgresScanLocalState { +// Local State + +struct PostgresScanLocalState : public duckdb::LocalTableFunctionState { public: - PostgresScanLocalState(const PostgresScanGlobalState *psgs) : m_output_vector_size(0), m_exhausted_scan(false) { - if (!psgs->m_count_tuples_only) { - const auto s = psgs->m_columns_to_scan.size(); - values.resize(s); - nulls.resize(s); - } - } + PostgresScanLocalState(PostgresScanGlobalState *global_state); + ~PostgresScanLocalState() override; + +public: + PostgresScanGlobalState *global_state; + size_t output_vector_size; + bool exhausted_scan; +}; - uint32_t m_output_vector_size; - bool m_exhausted_scan; - std::vector> values; - std::vector> nulls; +// PostgresScanFunctionData + +struct PostgresScanFunctionData : public duckdb::TableFunctionData { +public: + PostgresScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot); + ~PostgresScanFunctionData() override; + +public: + duckdb::vector complex_filters; + Relation rel; + uint64_t cardinality; + Snapshot snapshot; }; -duckdb::unique_ptr PostgresReplacementScan(duckdb::ClientContext &context, - duckdb::ReplacementScanInput &input, - duckdb::optional_ptr data); +// PostgresScanTableFunction + +struct PostgresScanTableFunction : public duckdb::TableFunction { +public: + PostgresScanTableFunction(); + +public: + static duckdb::unique_ptr + PostgresScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); + static duckdb::unique_ptr + PostgresScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, + duckdb::GlobalTableFunctionState *gstate); + static void PostgresScanFunction(duckdb::ClientContext &context, duckdb::TableFunctionInput &data, + duckdb::DataChunk &output); + static duckdb::unique_ptr PostgresScanCardinality(duckdb::ClientContext &context, + const duckdb::FunctionData *data); + static std::string ToString(const duckdb::FunctionData *bind_data); +}; } // namespace pgduckdb diff --git a/include/pgduckdb/scan/postgres_seq_scan.hpp b/include/pgduckdb/scan/postgres_seq_scan.hpp deleted file mode 100644 index 287ee0f6..00000000 --- a/include/pgduckdb/scan/postgres_seq_scan.hpp +++ /dev/null @@ -1,78 +0,0 @@ -#pragma once - -#include "duckdb.hpp" - -#include "pgduckdb/pgduckdb_guc.h" -#include "pgduckdb/pg/declarations.hpp" - -#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. - -namespace pgduckdb { - -class HeapReaderGlobalState; -class HeapReader; -class PostgresScanGlobalState; -class PostgresScanLocalState; - -// Global State - -struct PostgresSeqScanGlobalState : public duckdb::GlobalTableFunctionState { - explicit PostgresSeqScanGlobalState(Relation rel, duckdb::TableFunctionInitInput &input); - ~PostgresSeqScanGlobalState(); - idx_t - MaxThreads() const override { - return duckdb_max_threads_per_postgres_scan; - } - -public: - duckdb::shared_ptr m_global_state; - duckdb::shared_ptr m_heap_reader_global_state; - Relation m_rel; -}; - -// Local State - -struct PostgresSeqScanLocalState : public duckdb::LocalTableFunctionState { -public: - PostgresSeqScanLocalState(Relation rel, duckdb::shared_ptr heap_reader_global_state, - duckdb::shared_ptr global_state); - ~PostgresSeqScanLocalState() override; - -public: - duckdb::shared_ptr m_local_state; - duckdb::unique_ptr m_heap_table_reader; -}; - -// PostgresSeqScanFunctionData - -struct PostgresSeqScanFunctionData : public duckdb::TableFunctionData { -public: - PostgresSeqScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot); - ~PostgresSeqScanFunctionData() override; - -public: - Relation m_rel; - uint64_t m_cardinality; - Snapshot m_snapshot; -}; - -// PostgresSeqScanFunction - -struct PostgresSeqScanFunction : public duckdb::TableFunction { -public: - PostgresSeqScanFunction(); - -public: - static duckdb::unique_ptr - PostgresSeqScanInitGlobal(duckdb::ClientContext &context, duckdb::TableFunctionInitInput &input); - static duckdb::unique_ptr - PostgresSeqScanInitLocal(duckdb::ExecutionContext &context, duckdb::TableFunctionInitInput &input, - duckdb::GlobalTableFunctionState *gstate); - static void PostgresSeqScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data, - duckdb::DataChunk &output); - - static duckdb::unique_ptr PostgresSeqScanCardinality(duckdb::ClientContext &context, - const duckdb::FunctionData *data); -}; - -} // namespace pgduckdb diff --git a/include/pgduckdb/scan/postgres_table_reader.hpp b/include/pgduckdb/scan/postgres_table_reader.hpp new file mode 100644 index 00000000..88998188 --- /dev/null +++ b/include/pgduckdb/scan/postgres_table_reader.hpp @@ -0,0 +1,40 @@ +#pragma once + +#include "duckdb.hpp" + +#include "pgduckdb/pg/declarations.hpp" + +#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. + +namespace pgduckdb { + +// PostgresTableReader + +class PostgresTableReader { +public: + PostgresTableReader(const char *table_scan_query, bool count_tuples_only); + ~PostgresTableReader(); + TupleTableSlot *GetNextTuple(); + void PostgresTableReaderCleanup(); + +private: + MinimalTuple GetNextWorkerTuple(); + int ParallelWorkerNumber(Cardinality cardinality); + const char *ExplainScanPlan(QueryDesc *query_desc); + bool CanTableScanRunInParallel(Plan *plan); + bool MarkPlanParallelAware(Plan *plan); + +private: + QueryDesc *table_scan_query_desc; + PlanState *table_scan_planstate; + ParallelExecutorInfo *parallel_executor_info; + void **parallel_worker_readers; + TupleTableSlot *slot; + int nworkers_launched; + int nreaders; + int next_parallel_reader; + bool entered_parallel_mode; + bool cleaned_up; +}; + +} // namespace pgduckdb diff --git a/sql/pg_duckdb--0.2.0--0.3.0.sql b/sql/pg_duckdb--0.2.0--0.3.0.sql index f64f71b7..a23bb628 100644 --- a/sql/pg_duckdb--0.2.0--0.3.0.sql +++ b/sql/pg_duckdb--0.2.0--0.3.0.sql @@ -14,3 +14,6 @@ CREATE AGGREGATE @extschema@.approx_count_distinct(anyelement) stype = bigint, initcond = 0 ); + +CREATE DOMAIN pg_catalog.blob AS bytea; +COMMENT ON DOMAIN pg_catalog.blob IS 'The DuckDB BLOB alias for BYTEA'; diff --git a/src/catalog/pgduckdb_table.cpp b/src/catalog/pgduckdb_table.cpp index 6bc3dce8..05778d6a 100644 --- a/src/catalog/pgduckdb_table.cpp +++ b/src/catalog/pgduckdb_table.cpp @@ -1,11 +1,11 @@ #include "pgduckdb/catalog/pgduckdb_table.hpp" +#include "pgduckdb/scan/postgres_scan.hpp" #include "pgduckdb/catalog/pgduckdb_schema.hpp" #include "pgduckdb/logger.hpp" #include "pgduckdb/pg/relations.hpp" #include "pgduckdb/pgduckdb_process_lock.hpp" #include "pgduckdb/pgduckdb_types.hpp" // ConvertPostgresToDuckColumnType -#include "pgduckdb/scan/postgres_seq_scan.hpp" #include "duckdb/parser/parsed_data/create_table_info.hpp" @@ -20,13 +20,13 @@ PostgresTable::PostgresTable(duckdb::Catalog &_catalog, duckdb::SchemaCatalogEnt } PostgresTable::~PostgresTable() { - std::lock_guard lock(DuckdbProcessLock::GetLock()); + std::lock_guard lock(GlobalProcessLock::GetLock()); CloseRelation(rel); } Relation PostgresTable::OpenRelation(Oid relid) { - std::lock_guard lock(DuckdbProcessLock::GetLock()); + std::lock_guard lock(GlobalProcessLock::GetLock()); return pgduckdb::OpenRelation(relid); } @@ -46,38 +46,19 @@ PostgresTable::SetTableInfo(duckdb::CreateTableInfo &info, Relation rel) { } } -Cardinality -PostgresTable::GetTableCardinality(Relation rel) { - Cardinality cardinality; - BlockNumber n_pages; - double allvisfrac; - EstimateRelSize(rel, NULL, &n_pages, &cardinality, &allvisfrac); - return cardinality; -} - -//===--------------------------------------------------------------------===// -// PostgresHeapTable -//===--------------------------------------------------------------------===// - -PostgresHeapTable::PostgresHeapTable(duckdb::Catalog &_catalog, duckdb::SchemaCatalogEntry &_schema, - duckdb::CreateTableInfo &_info, Relation _rel, Cardinality _cardinality, - Snapshot _snapshot) - : PostgresTable(_catalog, _schema, _info, _rel, _cardinality, _snapshot) { -} - duckdb::unique_ptr -PostgresHeapTable::GetStatistics(duckdb::ClientContext &, duckdb::column_t) { +PostgresTable::GetStatistics(duckdb::ClientContext &, duckdb::column_t) { throw duckdb::NotImplementedException("GetStatistics not supported yet"); } duckdb::TableFunction -PostgresHeapTable::GetScanFunction(duckdb::ClientContext &, duckdb::unique_ptr &bind_data) { - bind_data = duckdb::make_uniq(rel, cardinality, snapshot); - return PostgresSeqScanFunction(); +PostgresTable::GetScanFunction(duckdb::ClientContext &, duckdb::unique_ptr &bind_data) { + bind_data = duckdb::make_uniq(rel, cardinality, snapshot); + return PostgresScanTableFunction(); } duckdb::TableStorageInfo -PostgresHeapTable::GetStorageInfo(duckdb::ClientContext &) { +PostgresTable::GetStorageInfo(duckdb::ClientContext &) { throw duckdb::NotImplementedException("GetStorageInfo not supported yet"); } diff --git a/src/catalog/pgduckdb_transaction.cpp b/src/catalog/pgduckdb_transaction.cpp index a40eb3a6..93fc0f91 100644 --- a/src/catalog/pgduckdb_transaction.cpp +++ b/src/catalog/pgduckdb_transaction.cpp @@ -39,24 +39,20 @@ SchemaItems::GetTable(const duckdb::string &entry_name) { } Oid rel_oid = GetRelidFromSchemaAndTable(name.c_str(), entry_name.c_str()); + if (!IsValidOid(rel_oid)) { return nullptr; // Table could not be found } Relation rel = PostgresTable::OpenRelation(rel_oid); - if (IsRelView(rel)) { - // Let the replacement scan handle this, the ReplacementScan replaces the view with its view_definition, which - // will get bound again and hit a PostgresIndexTable / PostgresHeapTable. - return nullptr; - } duckdb::CreateTableInfo info; info.table = entry_name; PostgresTable::SetTableInfo(info, rel); - auto cardinality = PostgresTable::GetTableCardinality(rel); - tables.emplace(entry_name, duckdb::make_uniq(schema->catalog, *schema, info, rel, cardinality, - schema->snapshot)); + auto cardinality = EstimateRelSize(rel); + tables.emplace(entry_name, duckdb::make_uniq(schema->catalog, *schema, info, rel, cardinality, + schema->snapshot)); return tables[entry_name].get(); } diff --git a/src/pg/relations.cpp b/src/pg/relations.cpp index 1bb1b6be..ed0a8c01 100644 --- a/src/pg/relations.cpp +++ b/src/pg/relations.cpp @@ -8,15 +8,36 @@ extern "C" { #include "access/relation.h" // relation_open and relation_close #include "catalog/namespace.h" // makeRangeVarFromNameList, RangeVarGetRelid #include "optimizer/plancat.h" // estimate_rel_size +#include "utils/builtins.h" +#include "utils/lsyscache.h" #include "utils/rel.h" -#include "utils/resowner.h" // CurrentResourceOwner and TopTransactionResourceOwner -#include "utils/syscache.h" // RELOID +#include "utils/resowner.h" // CurrentResourceOwner and TopTransactionResourceOwner +#include "executor/tuptable.h" // TupIsNull +#include "utils/syscache.h" // RELOID } namespace pgduckdb { #undef RelationGetDescr +#if PG_VERSION_NUM < 150000 +// clang-format off + +/* + * Relation kinds with a table access method (rd_tableam). Although sequences + * use the heap table AM, they are enough of a special case in most uses that + * they are not included here. Likewise, partitioned tables can have an access + * method defined so that their partitions can inherit it, but they do not set + * rd_tableam; hence, this is handled specially outside of this macro. + */ +#define RELKIND_HAS_TABLE_AM(relkind) \ + ((relkind) == RELKIND_RELATION || \ + (relkind) == RELKIND_TOASTVALUE || \ + (relkind) == RELKIND_MATVIEW) + +// clang-format on +#endif + TupleDesc RelationGetDescr(Relation rel) { return rel->rd_att; @@ -37,6 +58,16 @@ GetAttr(const TupleDesc tupleDesc, int i) { return &tupleDesc->attrs[i]; } +bool +TupleIsNull(TupleTableSlot *slot) { + return TupIsNull(slot); +} + +void +SlotGetAllAttrs(TupleTableSlot *slot) { + PostgresFunctionGuard(slot_getallattrs, slot); +} + Relation OpenRelation(Oid relationId) { /* @@ -67,9 +98,17 @@ CloseRelation(Relation rel) { CurrentResourceOwner = saveResourceOwner; } -void -EstimateRelSize(Relation rel, int32_t *attr_widths, BlockNumber *pages, double *tuples, double *allvisfrac) { - PostgresFunctionGuard(estimate_rel_size, rel, attr_widths, pages, tuples, allvisfrac); +double +EstimateRelSize(Relation rel) { + Cardinality cardinality = 0; + + if (RELKIND_HAS_TABLE_AM(rel->rd_rel->relkind) || rel->rd_rel->relkind == RELKIND_INDEX) { + BlockNumber pages; + double allvisfrac; + PostgresFunctionGuard(estimate_rel_size, rel, nullptr, &pages, &cardinality, &allvisfrac); + } + + return cardinality; } Oid @@ -91,14 +130,39 @@ IsValidOid(Oid oid) { return oid != InvalidOid; } -bool -IsRelView(Relation rel) { - return rel->rd_rel->relkind == RELKIND_VIEW; -} - bool IsValidBlockNumber(BlockNumber block_number) { return block_number != InvalidBlockNumber; } +/* + * generate_qualified_relation_name + * Compute the name to display for a relation specified by OID + * + * As above, but unconditionally schema-qualify the name. + */ +static char * +GenerateQualifiedRelationName_Unsafe(Relation rel) { + char *nspname = get_namespace_name_or_temp(rel->rd_rel->relnamespace); + if (!nspname) + elog(ERROR, "cache lookup failed for namespace %u", rel->rd_rel->relnamespace); + + return quote_qualified_identifier(nspname, NameStr(rel->rd_rel->relname)); +} + +char * +GenerateQualifiedRelationName(Relation rel) { + return PostgresFunctionGuard(GenerateQualifiedRelationName_Unsafe, rel); +} + +const char * +QuoteIdentifier(const char *ident) { + return PostgresFunctionGuard(quote_identifier, ident); +} + +const char * +GetRelationName(Relation rel) { + return RelationGetRelationName(rel); +} + } // namespace pgduckdb diff --git a/src/pgduckdb.cpp b/src/pgduckdb.cpp index f914ac2a..763c6969 100644 --- a/src/pgduckdb.cpp +++ b/src/pgduckdb.cpp @@ -4,6 +4,7 @@ extern "C" { #include "postgres.h" #include "miscadmin.h" #include "utils/guc.h" +#include "postmaster/bgworker_internals.h" } #include "pgduckdb/pgduckdb.h" @@ -14,7 +15,7 @@ extern "C" { static void DuckdbInitGUC(void); bool duckdb_force_execution = false; -int duckdb_max_threads_per_postgres_scan = 1; +int duckdb_max_workers_per_postgres_scan = 2; int duckdb_motherduck_enabled = MotherDuckEnabled::MOTHERDUCK_AUTO; char *duckdb_motherduck_token = strdup(""); char *duckdb_motherduck_postgres_database = strdup("postgres"); @@ -159,9 +160,9 @@ DuckdbInitGUC(void) { "Maximum number of DuckDB threads per Postgres backend, alias for duckdb.threads", &duckdb_maximum_threads, -1, 1024, PGC_SUSET); - DefineCustomVariable("duckdb.max_threads_per_postgres_scan", - "Maximum number of DuckDB threads used for a single Postgres scan", - &duckdb_max_threads_per_postgres_scan, 1, 64); + DefineCustomVariable("duckdb.max_workers_per_postgres_scan", + "Maximum number of PostgreSQL workers used for a single Postgres scan", + &duckdb_max_workers_per_postgres_scan, 0, MAX_PARALLEL_WORKER_LIMIT); DefineCustomVariable("duckdb.postgres_role", "Which postgres role should be allowed to use DuckDB execution, use the secrets and create " diff --git a/src/pgduckdb_detoast.cpp b/src/pgduckdb_detoast.cpp index 1597b8da..4f02c580 100644 --- a/src/pgduckdb_detoast.cpp +++ b/src/pgduckdb_detoast.cpp @@ -125,7 +125,7 @@ ToastFetchDatum(struct varlena *attr) { return result; } - std::lock_guard lock(DuckdbProcessLock::GetLock()); + std::lock_guard lock(GlobalProcessLock::GetLock()); if (!PostgresFunctionGuard(table_relation_fetch_toast_slice, toast_pointer, attrsize, result)) { duckdb_free(result); diff --git a/src/pgduckdb_duckdb.cpp b/src/pgduckdb_duckdb.cpp index 70331dfb..f80eadef 100644 --- a/src/pgduckdb_duckdb.cpp +++ b/src/pgduckdb_duckdb.cpp @@ -8,7 +8,6 @@ #include "pgduckdb/catalog/pgduckdb_storage.hpp" #include "pgduckdb/scan/postgres_scan.hpp" -#include "pgduckdb/scan/postgres_seq_scan.hpp" #include "pgduckdb/pg/transactions.hpp" #include "pgduckdb/pgduckdb_utils.hpp" @@ -135,8 +134,7 @@ DuckDBManager::Initialize() { duckdb::DBConfig config; config.SetOptionByName("custom_user_agent", "pg_duckdb"); config.SetOptionByName("extension_directory", CreateOrGetDirectoryPath("duckdb_extensions")); - // Transforms VIEWs into their view definition - config.replacement_scans.emplace_back(pgduckdb::PostgresReplacementScan); + SET_DUCKDB_OPTION(allow_unsigned_extensions); SET_DUCKDB_OPTION(enable_external_access); SET_DUCKDB_OPTION(autoinstall_known_extensions); @@ -208,14 +206,9 @@ DuckDBManager::Initialize() { void DuckDBManager::LoadFunctions(duckdb::ClientContext &context) { - pgduckdb::PostgresSeqScanFunction seq_scan_fun; - duckdb::CreateTableFunctionInfo seq_scan_info(seq_scan_fun); - - auto &catalog = duckdb::Catalog::GetSystemCatalog(context); context.transaction.BeginTransaction(); auto &instance = *database->instance; duckdb::ExtensionUtil::RegisterType(instance, "UnsupportedPostgresType", duckdb::LogicalTypeId::VARCHAR); - catalog.CreateTableFunction(context, &seq_scan_info); context.transaction.Commit(); } diff --git a/src/pgduckdb_hooks.cpp b/src/pgduckdb_hooks.cpp index 6936edbe..92a6801c 100644 --- a/src/pgduckdb_hooks.cpp +++ b/src/pgduckdb_hooks.cpp @@ -60,23 +60,6 @@ ContainsCatalogTable(List *rtes) { return false; } -static bool -ContainsPartitionedTable(List *rtes) { - foreach_node(RangeTblEntry, rte, rtes) { - if (rte->rtekind == RTE_SUBQUERY) { - /* Check whether any table in the subquery is a partitioned table */ - if (ContainsPartitionedTable(rte->subquery->rtable)) { - return true; - } - } - - if (rte->relkind == RELKIND_PARTITIONED_TABLE) { - return true; - } - } - return false; -} - static bool IsDuckdbTable(Oid relid) { if (relid == InvalidOid) { @@ -178,14 +161,6 @@ IsAllowedStatement(Query *query, bool throw_error = false) { return false; } - /* - * When accessing the partitioned table, we temporarily let PG handle it instead of DuckDB. - */ - if (ContainsPartitionedTable(query->rtable)) { - elog(elevel, "DuckDB does not support querying PG partitioned table"); - return false; - } - /* Anything else is hopefully fine... */ return true; } diff --git a/src/pgduckdb_node.cpp b/src/pgduckdb_node.cpp index 355c2c89..9c796ba6 100644 --- a/src/pgduckdb_node.cpp +++ b/src/pgduckdb_node.cpp @@ -137,8 +137,12 @@ ExecuteQuery(DuckdbScanState *state) { } duckdb::PendingExecutionResult execution_result; - do { + while (true) { execution_result = pending->ExecuteTask(); + if (duckdb::PendingQueryResult::IsResultReady(execution_result)) { + break; + } + if (QueryCancelPending) { // Send an interrupt connection->Interrupt(); @@ -150,7 +154,7 @@ ExecuteQuery(DuckdbScanState *state) { ProcessInterrupts(); throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, "Query cancelled"); } - } while (!duckdb::PendingQueryResult::IsResultReady(execution_result)); + } if (execution_result == duckdb::PendingExecutionResult::EXECUTION_ERROR) { return pending->ThrowError(); @@ -254,10 +258,9 @@ Duckdb_ExplainCustomScan_Cpp(CustomScanState *node, ExplainState *es) { chunk = duckdb_scan_state->query_results->Fetch(); } while (chunk && chunk->size() > 0); - std::string explain_output = "\n\n"; - explain_output += value; - explain_output += "\n"; - ExplainPropertyText("DuckDB Execution Plan", explain_output.c_str(), es); + std::ostringstream explain_output; + explain_output << "\n\n" << value << "\n"; + ExplainPropertyText("DuckDB Execution Plan", explain_output.str().c_str(), es); } void diff --git a/src/pgduckdb_ruleutils.cpp b/src/pgduckdb_ruleutils.cpp index 4622b061..5624b586 100644 --- a/src/pgduckdb_ruleutils.cpp +++ b/src/pgduckdb_ruleutils.cpp @@ -153,6 +153,7 @@ pgduckdb_relation_name(Oid relation_oid) { } const char *db_and_schema = pgduckdb_db_and_schema_string(postgres_schema_name, is_duckdb_table); + char *result = psprintf("%s.%s", db_and_schema, quote_identifier(relname)); ReleaseSysCache(tp); diff --git a/src/pgduckdb_types.cpp b/src/pgduckdb_types.cpp index c8ee60ac..2597dece 100644 --- a/src/pgduckdb_types.cpp +++ b/src/pgduckdb_types.cpp @@ -1348,157 +1348,33 @@ ConvertPostgresToDuckValue(Oid attr_type, Datum value, duckdb::Vector &result, i } } -typedef struct HeapTupleReadState { - bool m_slow = 0; - int m_last_tuple_att = 0; - uint32 m_page_tuple_offset = 0; -} HeapTupleReadState; - -static Datum -HeapTupleFetchNextColumnDatum(TupleDesc tupleDesc, HeapTuple tuple, HeapTupleReadState &heap_tuple_read_state, - AttrNumber target_attr_num, bool *is_null, const duckdb::map &missing_attrs) { - HeapTupleHeader tup = tuple->t_data; - bool hasnulls = HeapTupleHasNulls(tuple); - int natts = HeapTupleHeaderGetNatts(tup); - bits8 *null_bitmap = tup->t_bits; - - if (natts < target_attr_num) { - if (auto missing_attr = missing_attrs.find(target_attr_num - 1); missing_attr != missing_attrs.end()) { - *is_null = false; - return missing_attr->second; - } else { - *is_null = true; - return PointerGetDatum(NULL); - } - } - - /* Which tuple are we currently reading */ - AttrNumber current_attr_num = heap_tuple_read_state.m_last_tuple_att + 1; - /* Either restore from previous fetch, or use the defaults of 0 and false */ - uint32 current_tuple_offset = heap_tuple_read_state.m_page_tuple_offset; - bool slow = heap_tuple_read_state.m_slow; - - /* Points to the start of the tuple data section, i.e. right after the - * tuple header */ - char *tuple_data = (char *)tup + tup->t_hoff; - - Datum value = (Datum)0; - for (; current_attr_num <= target_attr_num; current_attr_num++) { - Form_pg_attribute thisatt = TupleDescAttr(tupleDesc, current_attr_num - 1); - - if (hasnulls && att_isnull(current_attr_num - 1, null_bitmap)) { - value = (Datum)0; - *is_null = true; - /* - * Can't use attcacheoff anymore. The hardcoded attribute offset - * assumes all attribute before it are present in the tuple. If - * they are NULL, they are not present. - */ - slow = true; - continue; - } - - *is_null = false; - - if (!slow && thisatt->attcacheoff >= 0) { - current_tuple_offset = thisatt->attcacheoff; - } else if (thisatt->attlen == -1) { - if (!slow && current_tuple_offset == att_align_nominal(current_tuple_offset, thisatt->attalign)) { - thisatt->attcacheoff = current_tuple_offset; - } else { - current_tuple_offset = - att_align_pointer(current_tuple_offset, thisatt->attalign, -1, tuple_data + current_tuple_offset); - slow = true; - } - } else { - current_tuple_offset = att_align_nominal(current_tuple_offset, thisatt->attalign); - if (!slow) { - thisatt->attcacheoff = current_tuple_offset; - } - } - - value = fetchatt(thisatt, tuple_data + current_tuple_offset); - - current_tuple_offset = - att_addlength_pointer(current_tuple_offset, thisatt->attlen, tuple_data + current_tuple_offset); - - if (thisatt->attlen <= 0) { - slow = true; - } - } - - heap_tuple_read_state.m_last_tuple_att = target_attr_num; - heap_tuple_read_state.m_page_tuple_offset = current_tuple_offset; - heap_tuple_read_state.m_slow = slow; - - return value; -} - void -InsertTupleIntoChunk(duckdb::DataChunk &output, duckdb::shared_ptr scan_global_state, - duckdb::shared_ptr scan_local_state, HeapTupleData *tuple) { - HeapTupleReadState heap_tuple_read_state = {}; - - if (scan_global_state->m_count_tuples_only) { - scan_local_state->m_output_vector_size++; - return; - } +InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_local_state, TupleTableSlot *slot) { - auto &values = scan_local_state->values; - auto &nulls = scan_local_state->nulls; - - /* First we are fetching all required columns ordered by column id - * and than we need to write this tuple into output vector. Output column id list - * could be out of order so we need to match column values from ordered list. - */ - - /* Read heap tuple with all required columns. */ - for (auto const &[attr_num, duckdb_scanned_index] : scan_global_state->m_columns_to_scan) { - bool is_null = false; - values[duckdb_scanned_index] = - HeapTupleFetchNextColumnDatum(scan_global_state->m_tuple_desc, tuple, heap_tuple_read_state, attr_num, - &is_null, scan_global_state->m_relation_missing_attrs); - nulls[duckdb_scanned_index] = is_null; - auto filter = scan_global_state->m_column_filters[duckdb_scanned_index]; - if (!filter) { - continue; - } + auto scan_global_state = scan_local_state.global_state; - const auto valid_tuple = ApplyValueFilter(*filter, values[duckdb_scanned_index], is_null, - scan_global_state->m_tuple_desc->attrs[attr_num - 1].atttypid); - if (!valid_tuple) { - return; - } + if (scan_global_state->count_tuples_only) { + /* COUNT(*) returned tuple will have only one value returned as first tuple element. */ + scan_global_state->total_row_count += slot->tts_values[0]; + scan_local_state.output_vector_size += slot->tts_values[0]; + return; } - /* Write tuple columns in output vector. */ - int duckdb_output_index = 0; - for (auto const &[duckdb_scanned_index, attr_num] : scan_global_state->m_output_columns) { + for (int duckdb_output_index = 0; duckdb_output_index < slot->tts_tupleDescriptor->natts; duckdb_output_index++) { auto &result = output.data[duckdb_output_index]; - if (nulls[duckdb_scanned_index]) { + if (slot->tts_isnull[duckdb_output_index]) { auto &array_mask = duckdb::FlatVector::Validity(result); - array_mask.SetInvalid(scan_local_state->m_output_vector_size); + array_mask.SetInvalid(scan_local_state.output_vector_size); } else { - auto attr = scan_global_state->m_tuple_desc->attrs[attr_num - 1]; - if (attr.attlen == -1) { - bool should_free = false; - values[duckdb_scanned_index] = - DetoastPostgresDatum(reinterpret_cast(values[duckdb_scanned_index]), &should_free); - ConvertPostgresToDuckValue(attr.atttypid, values[duckdb_scanned_index], result, - scan_local_state->m_output_vector_size); - if (should_free) { - duckdb_free(reinterpret_cast(values[duckdb_scanned_index])); - } - } else { - ConvertPostgresToDuckValue(attr.atttypid, values[duckdb_scanned_index], result, - scan_local_state->m_output_vector_size); - } + /* Use returned tuple slot attr information. */ + auto attr = slot->tts_tupleDescriptor->attrs[duckdb_output_index]; + ConvertPostgresToDuckValue(attr.atttypid, slot->tts_values[duckdb_output_index], result, + scan_local_state.output_vector_size); } - duckdb_output_index++; } - scan_local_state->m_output_vector_size++; - scan_global_state->m_total_row_count++; + scan_local_state.output_vector_size++; + scan_global_state->total_row_count++; } NumericVar diff --git a/src/scan/heap_reader.cpp b/src/scan/heap_reader.cpp deleted file mode 100644 index 95344441..00000000 --- a/src/scan/heap_reader.cpp +++ /dev/null @@ -1,180 +0,0 @@ -#include "duckdb.hpp" - -#include "pgduckdb/scan/heap_reader.hpp" -#include "pgduckdb/pgduckdb_types.hpp" -#include "pgduckdb/pgduckdb_utils.hpp" - -extern "C" { -#include "postgres.h" -#include "pgstat.h" -#include "access/heapam.h" -#include "storage/bufmgr.h" -#include "storage/bufpage.h" -#include "utils/rel.h" -} - -#include "pgduckdb/pgduckdb_process_lock.hpp" - -#include - -namespace pgduckdb { - -// -// HeapReaderGlobalState -// - -HeapReaderGlobalState::HeapReaderGlobalState(Relation rel) - : m_nblocks(RelationGetNumberOfBlocks(rel)), m_last_assigned_block_number(InvalidBlockNumber) { -} - -BlockNumber -HeapReaderGlobalState::AssignNextBlockNumber(std::mutex &lock) { - lock.lock(); - BlockNumber block_number = InvalidBlockNumber; - if (m_nblocks > 0 && m_last_assigned_block_number == InvalidBlockNumber) { - block_number = m_last_assigned_block_number = 0; - } else if (m_nblocks > 0 && m_last_assigned_block_number < m_nblocks - 1) { - block_number = ++m_last_assigned_block_number; - } - lock.unlock(); - return block_number; -} - -// -// HeapReader -// - -HeapReader::HeapReader(Relation rel, duckdb::shared_ptr heap_reader_global_state, - duckdb::shared_ptr global_state, - duckdb::shared_ptr local_state) - : m_global_state(global_state), m_heap_reader_global_state(heap_reader_global_state), m_local_state(local_state), - m_rel(rel), m_inited(false), m_read_next_page(true), m_block_number(InvalidBlockNumber), m_buffer(InvalidBuffer), - m_current_tuple_index(InvalidOffsetNumber), m_page_tuples_left(0) { - m_tuple = duckdb::make_uniq(); - m_tuple->t_data = NULL; - m_tuple->t_tableOid = RelationGetRelid(m_rel); - ItemPointerSetInvalid(&m_tuple->t_self); - DuckdbProcessLock::GetLock().lock(); - m_buffer_access_strategy = GetAccessStrategy(BAS_BULKREAD); - DuckdbProcessLock::GetLock().unlock(); -} - -HeapReader::~HeapReader() { - DuckdbProcessLock::GetLock().lock(); - /* If execution is interrupted and buffer is still opened close it now */ - if (m_buffer != InvalidBuffer) { - UnlockReleaseBuffer(m_buffer); - } - FreeAccessStrategy(m_buffer_access_strategy); - DuckdbProcessLock::GetLock().unlock(); -} - -Page -HeapReader::PreparePageRead() { - Page page = BufferGetPage(m_buffer); -#if PG_VERSION_NUM < 170000 - TestForOldSnapshot(m_global_state->m_snapshot, m_rel, page); -#endif - m_page_tuples_all_visible = PageIsAllVisible(page) && !m_global_state->m_snapshot->takenDuringRecovery; - m_page_tuples_left = PageGetMaxOffsetNumber(page) - FirstOffsetNumber + 1; - m_current_tuple_index = FirstOffsetNumber; - return page; -} - -bool -HeapReader::ReadPageTuples(duckdb::DataChunk &output) { - BlockNumber block = InvalidBlockNumber; - Page page = nullptr; - - if (!m_inited) { - block = m_block_number = m_heap_reader_global_state->AssignNextBlockNumber(m_global_state->m_lock); - if (m_block_number == InvalidBlockNumber) { - return false; - } - m_inited = true; - m_read_next_page = true; - } else { - block = m_block_number; - if (!m_read_next_page) { - page = BufferGetPage(m_buffer); - } - } - - while (block != InvalidBlockNumber) { - if (m_read_next_page) { - CHECK_FOR_INTERRUPTS(); - std::lock_guard lock(DuckdbProcessLock::GetLock()); - block = m_block_number; - - m_buffer = PostgresFunctionGuard(ReadBufferExtended, m_rel, MAIN_FORKNUM, block, RBM_NORMAL, - m_buffer_access_strategy); - - PostgresFunctionGuard(LockBuffer, m_buffer, BUFFER_LOCK_SHARE); - - page = PreparePageRead(); - m_read_next_page = false; - } - - for (; m_page_tuples_left > 0 && m_local_state->m_output_vector_size < STANDARD_VECTOR_SIZE; - m_page_tuples_left--, m_current_tuple_index++) { - bool visible = true; - ItemId lpp = PageGetItemId(page, m_current_tuple_index); - - if (!ItemIdIsNormal(lpp)) - continue; - - m_tuple->t_data = (HeapTupleHeader)PageGetItem(page, lpp); - m_tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(m_tuple->t_self), block, m_current_tuple_index); - - if (!m_page_tuples_all_visible) { - std::lock_guard lock(DuckdbProcessLock::GetLock()); - visible = HeapTupleSatisfiesVisibility(m_tuple.get(), m_global_state->m_snapshot, m_buffer); - /* skip tuples not visible to this snapshot */ - if (!visible) - continue; - } - - pgstat_count_heap_getnext(m_rel); - InsertTupleIntoChunk(output, m_global_state, m_local_state, m_tuple.get()); - } - - /* No more items on current page */ - if (!m_page_tuples_left) { - DuckdbProcessLock::GetLock().lock(); - UnlockReleaseBuffer(m_buffer); - DuckdbProcessLock::GetLock().unlock(); - m_buffer = InvalidBuffer; - m_read_next_page = true; - /* Handle cancel request */ - if (QueryCancelPending) { - block = m_block_number = InvalidBlockNumber; - } else { - block = m_block_number = m_heap_reader_global_state->AssignNextBlockNumber(m_global_state->m_lock); - } - } - - /* We have collected STANDARD_VECTOR_SIZE */ - if (m_local_state->m_output_vector_size == STANDARD_VECTOR_SIZE) { - output.SetCardinality(m_local_state->m_output_vector_size); - output.Verify(); - m_local_state->m_output_vector_size = 0; - return true; - } - } - - /* Next assigned block number is InvalidBlockNumber so we check did we write any tuples in output vector */ - if (m_local_state->m_output_vector_size) { - output.SetCardinality(m_local_state->m_output_vector_size); - output.Verify(); - m_local_state->m_output_vector_size = 0; - } - - m_buffer = InvalidBuffer; - m_block_number = InvalidBlockNumber; - m_tuple->t_data = NULL; - m_read_next_page = false; - - return false; -} -} // namespace pgduckdb diff --git a/src/scan/postgres_scan.cpp b/src/scan/postgres_scan.cpp index ffa9603e..11a85306 100644 --- a/src/scan/postgres_scan.cpp +++ b/src/scan/postgres_scan.cpp @@ -1,46 +1,26 @@ -#include "duckdb/main/client_context.hpp" -#include "duckdb/function/replacement_scan.hpp" -#include "duckdb/parser/tableref/table_function_ref.hpp" -#include "duckdb/parser/parser.hpp" -#include "duckdb/parser/tableref/subqueryref.hpp" -#include "duckdb/parser/expression/function_expression.hpp" -#include "duckdb/parser/statement/select_statement.hpp" -#include "duckdb/parser/expression/constant_expression.hpp" -#include "duckdb/parser/expression/comparison_expression.hpp" -#include "duckdb/parser/expression/columnref_expression.hpp" -#include "duckdb/parser/qualified_name.hpp" -#include "duckdb/common/enums/statement_type.hpp" -#include "duckdb/common/enums/expression_type.hpp" - #include "pgduckdb/scan/postgres_scan.hpp" +#include "pgduckdb/scan/postgres_table_reader.hpp" #include "pgduckdb/pgduckdb_types.hpp" #include "pgduckdb/pgduckdb_utils.hpp" - -extern "C" { -#include "postgres.h" -#include "access/htup_details.h" -#include "catalog/namespace.h" -#include "catalog/pg_class.h" -#include "optimizer/planmain.h" -#include "optimizer/planner.h" -#include "utils/builtins.h" -#include "utils/regproc.h" -#include "utils/snapmgr.h" -#include "utils/syscache.h" -} +#include "pgduckdb/pg/relations.hpp" #include "pgduckdb/pgduckdb_process_lock.hpp" +#include "pgduckdb/logger.hpp" namespace pgduckdb { +// +// PostgresScanGlobalState +// + void -PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input) { +PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput &input) { /* SELECT COUNT(*) FROM */ if (input.column_ids.size() == 1 && input.column_ids[0] == UINT64_MAX) { - m_count_tuples_only = true; + scan_query << "SELECT COUNT(*) FROM " << pgduckdb::GenerateQualifiedRelationName(rel); + count_tuples_only = true; return; } - /* * We need to read columns from the Postgres tuple in column order, but for * outputting them we care about the DuckDB order. A map automatically @@ -55,10 +35,12 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input) } auto table_filters = input.filters.get(); - m_column_filters.resize(input.column_ids.size(), 0); + + std::vector> columns_to_scan; + std::vector column_filters(input.column_ids.size(), 0); for (auto const &[att_num, duckdb_scanned_index] : pg_column_order) { - m_columns_to_scan.emplace_back(att_num, duckdb_scanned_index); + columns_to_scan.emplace_back(att_num, duckdb_scanned_index); if (!table_filters) { continue; @@ -66,7 +48,7 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input) auto column_filter_it = table_filters->filters.find(duckdb_scanned_index); if (column_filter_it != table_filters->filters.end()) { - m_column_filters[duckdb_scanned_index] = column_filter_it->second.get(); + column_filters[duckdb_scanned_index] = column_filter_it->second.get(); } } @@ -78,99 +60,164 @@ PostgresScanGlobalState::InitGlobalState(duckdb::TableFunctionInitInput &input) */ if (input.CanRemoveFilterColumns()) { for (const auto &projection_id : input.projection_ids) { - m_output_columns.emplace_back(projection_id, input.column_ids[projection_id] + 1); + output_columns.emplace_back(input.column_ids[projection_id] + 1); } } else { - duckdb::idx_t output_index = 0; for (const auto &column_id : input.column_ids) { - m_output_columns.emplace_back(output_index++, column_id + 1); + output_columns.emplace_back(column_id + 1); } } -} -void -PostgresScanGlobalState::InitRelationMissingAttrs(TupleDesc tuple_desc) { - std::lock_guard lock(DuckdbProcessLock::GetLock()); - for (int attnum = 0; attnum < tuple_desc->natts; attnum++) { - bool is_null = false; - Datum attr = PostgresFunctionGuard(getmissingattr, tuple_desc, attnum + 1, &is_null); - /* Add missing attr datum if not null*/ - if (!is_null) { - m_relation_missing_attrs[attnum] = attr; + scan_query << "SELECT "; + + bool first = true; + for (auto const &attr_num : output_columns) { + if (!first) { + scan_query << ", "; } + first = false; + auto attr = GetAttr(table_tuple_desc, attr_num - 1); + scan_query << pgduckdb::QuoteIdentifier(GetAttName(attr)); } -} -static Oid -FindMatchingRelation(const duckdb::string &schema, const duckdb::string &table) { - List *name_list = NIL; - if (!schema.empty()) { - name_list = lappend(name_list, makeString(pstrdup(schema.c_str()))); + scan_query << " FROM " << GenerateQualifiedRelationName(rel); + + first = true; + + for (auto const &[attr_num, duckdb_scanned_index] : columns_to_scan) { + auto filter = column_filters[duckdb_scanned_index]; + + if (!filter) { + continue; + } + + if (first) { + scan_query << " WHERE "; + } else { + scan_query << " AND "; + } + + first = false; + scan_query << "("; + auto attr = GetAttr(table_tuple_desc, attr_num - 1); + auto col = pgduckdb::QuoteIdentifier(GetAttName(attr)); + scan_query << filter->ToString(col).c_str(); + scan_query << ") "; } +} + +PostgresScanGlobalState::PostgresScanGlobalState(Snapshot _snapshot, Relation _rel, + duckdb::TableFunctionInitInput &input) + : snapshot(_snapshot), rel(_rel), table_tuple_desc(RelationGetDescr(rel)), count_tuples_only(false), + total_row_count(0) { + ConstructTableScanQuery(input); + table_reader_global_state = + duckdb::make_shared_ptr(scan_query.str().c_str(), count_tuples_only); + pd_log(DEBUG2, "(DuckDB/PostgresSeqScanGlobalState) Running %" PRIu64 " threads -- ", (uint64_t)MaxThreads()); +} + +PostgresScanGlobalState::~PostgresScanGlobalState() { +} - name_list = lappend(name_list, makeString(pstrdup(table.c_str()))); +// +// PostgresScanLocalState +// - RangeVar *table_range_var = makeRangeVarFromNameList(name_list); - return RangeVarGetRelid(table_range_var, AccessShareLock, true); +PostgresScanLocalState::PostgresScanLocalState(PostgresScanGlobalState *_global_state) + : global_state(_global_state), exhausted_scan(false) { } -const char * -pgduckdb_pg_get_viewdef(Oid view) { - auto oid = ObjectIdGetDatum(view); - Datum viewdef = DirectFunctionCall1(pg_get_viewdef, oid); - return text_to_cstring(DatumGetTextP(viewdef)); +PostgresScanLocalState::~PostgresScanLocalState() { } -duckdb::unique_ptr -ReplaceView(Oid view) { - const auto view_definition = PostgresFunctionGuard(pgduckdb_pg_get_viewdef, view); +// +// PostgresSeqScanFunctionData +// - if (!view_definition) { - throw duckdb::InvalidInputException("Could not retrieve view definition for Relation with relid: %u", view); - } +PostgresScanFunctionData::PostgresScanFunctionData(Relation _rel, uint64_t _cardinality, Snapshot _snapshot) + : rel(_rel), cardinality(_cardinality), snapshot(_snapshot) { +} - duckdb::Parser parser; - parser.ParseQuery(view_definition); - auto &statements = parser.statements; - if (statements.size() != 1) { - throw duckdb::InvalidInputException("View definition contained more than 1 statement!"); - } +PostgresScanFunctionData::~PostgresScanFunctionData() { +} - if (statements[0]->type != duckdb::StatementType::SELECT_STATEMENT) { - throw duckdb::InvalidInputException("View definition (%s) did not contain a SELECT statement!", - view_definition); - } +// +// PostgresScanFunction +// + +PostgresScanTableFunction::PostgresScanTableFunction() + : TableFunction("postgres_scan", {}, PostgresScanFunction, nullptr, PostgresScanInitGlobal, PostgresScanInitLocal) { + named_parameters["cardinality"] = duckdb::LogicalType::UBIGINT; + named_parameters["relid"] = duckdb::LogicalType::UINTEGER; + named_parameters["snapshot"] = duckdb::LogicalType::POINTER; + projection_pushdown = true; + filter_pushdown = true; + filter_prune = true; + cardinality = PostgresScanCardinality; + to_string = ToString; +} + +std::string +PostgresScanTableFunction::ToString(const duckdb::FunctionData *data) { + auto &bind_data = data->Cast(); + std::ostringstream oss; + oss << "(POSTGRES_SCAN) " << GetRelationName(bind_data.rel); + return oss.str(); +} - auto select = duckdb::unique_ptr_cast(std::move(statements[0])); - return duckdb::make_uniq(std::move(select)); +duckdb::unique_ptr +PostgresScanTableFunction::PostgresScanInitGlobal(duckdb::ClientContext &, duckdb::TableFunctionInitInput &input) { + auto &bind_data = input.bind_data->CastNoConst(); + return duckdb::make_uniq(bind_data.snapshot, bind_data.rel, input); } -duckdb::unique_ptr -PostgresReplacementScan(duckdb::ClientContext &, duckdb::ReplacementScanInput &input, - duckdb::optional_ptr) { +duckdb::unique_ptr +PostgresScanTableFunction::PostgresScanInitLocal(duckdb::ExecutionContext &, duckdb::TableFunctionInitInput &, + duckdb::GlobalTableFunctionState *gstate) { + auto global_state = reinterpret_cast(gstate); + return duckdb::make_uniq(global_state); +} +void +SetOutputCardinality(duckdb::DataChunk &output, PostgresScanLocalState &local_state) { + idx_t output_cardinality = + local_state.output_vector_size <= STANDARD_VECTOR_SIZE ? local_state.output_vector_size : STANDARD_VECTOR_SIZE; + output.SetCardinality(output_cardinality); + local_state.output_vector_size -= output_cardinality; +} - auto &schema_name = input.schema_name; - auto &table_name = input.table_name; +void +PostgresScanTableFunction::PostgresScanFunction(duckdb::ClientContext &, duckdb::TableFunctionInput &data, + duckdb::DataChunk &output) { + auto &local_state = data.local_state->Cast(); - auto relid = PostgresFunctionGuard(FindMatchingRelation, schema_name, table_name); - if (relid == InvalidOid) { - return nullptr; + /* We have exhausted table scan */ + if (local_state.exhausted_scan) { + SetOutputCardinality(output, local_state); + return; } - auto tuple = PostgresFunctionGuard(SearchSysCache1, RELOID, ObjectIdGetDatum(relid)); - if (!HeapTupleIsValid(tuple)) { - elog(WARNING, "(PGDuckDB/PostgresReplacementScan) Cache lookup failed for relation %u", relid); - return nullptr; - } + local_state.output_vector_size = 0; - auto relForm = (Form_pg_class)GETSTRUCT(tuple); - if (relForm->relkind != RELKIND_VIEW) { - PostgresFunctionGuard(ReleaseSysCache, tuple); - return nullptr; + size_t i = 0; + std::lock_guard lock(GlobalProcessLock::GetLock()); + for (; i < STANDARD_VECTOR_SIZE; i++) { + TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple(); + if (pgduckdb::TupleIsNull(slot)) { + local_state.global_state->table_reader_global_state->PostgresTableReaderCleanup(); + local_state.exhausted_scan = true; + break; + } + SlotGetAllAttrs(slot); + InsertTupleIntoChunk(output, local_state, slot); } - PostgresFunctionGuard(ReleaseSysCache, tuple); - return ReplaceView(relid); + SetOutputCardinality(output, local_state); +} + +duckdb::unique_ptr +PostgresScanTableFunction::PostgresScanCardinality(duckdb::ClientContext &, const duckdb::FunctionData *data) { + auto &bind_data = data->Cast(); + return duckdb::make_uniq(bind_data.cardinality, bind_data.cardinality); } } // namespace pgduckdb diff --git a/src/scan/postgres_seq_scan.cpp b/src/scan/postgres_seq_scan.cpp deleted file mode 100644 index 3a11e18d..00000000 --- a/src/scan/postgres_seq_scan.cpp +++ /dev/null @@ -1,115 +0,0 @@ -#include "duckdb.hpp" - -#include "pgduckdb/scan/postgres_seq_scan.hpp" -#include "pgduckdb/pgduckdb_types.hpp" -#include "pgduckdb/logger.hpp" -#include "pgduckdb/scan/heap_reader.hpp" -#include "pgduckdb/pg/relations.hpp" - -#include "pgduckdb/utility/cpp_only_file.hpp" // Must be last include. - -namespace pgduckdb { - -// -// PostgresSeqScanGlobalState -// - -PostgresSeqScanGlobalState::PostgresSeqScanGlobalState(Relation rel, duckdb::TableFunctionInitInput &input) - : m_global_state(duckdb::make_shared_ptr()), - m_heap_reader_global_state(duckdb::make_shared_ptr(rel)), m_rel(rel) { - m_global_state->InitGlobalState(input); - m_global_state->m_tuple_desc = RelationGetDescr(m_rel); - m_global_state->InitRelationMissingAttrs(m_global_state->m_tuple_desc); - pd_log(DEBUG2, "(DuckDB/PostgresSeqScanGlobalState) Running %" PRIu64 " threads -- ", (uint64_t)MaxThreads()); -} - -PostgresSeqScanGlobalState::~PostgresSeqScanGlobalState() { -} - -// -// PostgresSeqScanLocalState -// - -PostgresSeqScanLocalState::PostgresSeqScanLocalState(Relation rel, - duckdb::shared_ptr heap_reder_global_state, - duckdb::shared_ptr global_state) { - m_local_state = duckdb::make_shared_ptr(global_state.get()); - m_heap_table_reader = duckdb::make_uniq(rel, heap_reder_global_state, global_state, m_local_state); -} - -PostgresSeqScanLocalState::~PostgresSeqScanLocalState() { -} - -// -// PostgresSeqScanFunctionData -// - -PostgresSeqScanFunctionData::PostgresSeqScanFunctionData(Relation rel, uint64_t cardinality, Snapshot snapshot) - : m_rel(rel), m_cardinality(cardinality), m_snapshot(snapshot) { -} - -PostgresSeqScanFunctionData::~PostgresSeqScanFunctionData() { -} - -// -// PostgresSeqScanFunction -// - -PostgresSeqScanFunction::PostgresSeqScanFunction() - : TableFunction("postgres_seq_scan", {}, PostgresSeqScanFunc, nullptr, PostgresSeqScanInitGlobal, - PostgresSeqScanInitLocal) { - named_parameters["cardinality"] = duckdb::LogicalType::UBIGINT; - named_parameters["relid"] = duckdb::LogicalType::UINTEGER; - named_parameters["snapshot"] = duckdb::LogicalType::POINTER; - projection_pushdown = true; - filter_pushdown = true; - filter_prune = true; - cardinality = PostgresSeqScanCardinality; -} - -duckdb::unique_ptr -PostgresSeqScanFunction::PostgresSeqScanInitGlobal(duckdb::ClientContext &, duckdb::TableFunctionInitInput &input) { - auto &bind_data = input.bind_data->CastNoConst(); - auto global_state = duckdb::make_uniq(bind_data.m_rel, input); - global_state->m_global_state->m_snapshot = bind_data.m_snapshot; -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wredundant-move" - return std::move(global_state); -#pragma GCC diagnostic pop -} - -duckdb::unique_ptr -PostgresSeqScanFunction::PostgresSeqScanInitLocal(duckdb::ExecutionContext &, duckdb::TableFunctionInitInput &, - duckdb::GlobalTableFunctionState *gstate) { - auto global_state = reinterpret_cast(gstate); - return duckdb::make_uniq(global_state->m_rel, global_state->m_heap_reader_global_state, - global_state->m_global_state); -} - -void -PostgresSeqScanFunction::PostgresSeqScanFunc(duckdb::ClientContext &, duckdb::TableFunctionInput &data, - duckdb::DataChunk &output) { - auto &local_state = data.local_state->Cast(); - - local_state.m_local_state->m_output_vector_size = 0; - - /* We have exhausted seq scan of heap table so we can return */ - if (local_state.m_local_state->m_exhausted_scan) { - output.SetCardinality(0); - return; - } - - auto hasTuple = local_state.m_heap_table_reader->ReadPageTuples(output); - - if (!hasTuple || !IsValidBlockNumber(local_state.m_heap_table_reader->GetCurrentBlockNumber())) { - local_state.m_local_state->m_exhausted_scan = true; - } -} - -duckdb::unique_ptr -PostgresSeqScanFunction::PostgresSeqScanCardinality(duckdb::ClientContext &, const duckdb::FunctionData *data) { - auto &bind_data = data->Cast(); - return duckdb::make_uniq(bind_data.m_cardinality, bind_data.m_cardinality); -} - -} // namespace pgduckdb diff --git a/src/scan/postgres_table_reader.cpp b/src/scan/postgres_table_reader.cpp new file mode 100644 index 00000000..ff84fac5 --- /dev/null +++ b/src/scan/postgres_table_reader.cpp @@ -0,0 +1,334 @@ +#include "pgduckdb/scan/postgres_table_reader.hpp" +#include "pgduckdb/pgduckdb_process_lock.hpp" +#include "pgduckdb/pgduckdb_utils.hpp" + +extern "C" { +#include "postgres.h" +#include "miscadmin.h" +#include "access/xact.h" +#include "commands/explain.h" +#include "executor/executor.h" +#include "executor/execParallel.h" +#include "executor/tqueue.h" +#include "optimizer/planmain.h" +#include "optimizer/planner.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/rel.h" +#include "utils/snapmgr.h" +#include "utils/wait_event.h" + +#include "pgduckdb/pgduckdb_guc.h" +} + +#include "pgduckdb/vendor/pg_list.hpp" + +#include + +namespace pgduckdb { + +PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool count_tuples_only) + : parallel_executor_info(nullptr), parallel_worker_readers(nullptr), nreaders(0), next_parallel_reader(0), + entered_parallel_mode(false), cleaned_up(false) { + + std::lock_guard lock(GlobalProcessLock::GetLock()); + PostgresScopedStackReset scoped_stack_reset; + + List *raw_parsetree_list = PostgresFunctionGuard(pg_parse_query, table_scan_query); + Assert(list_length(raw_parsetree_list) == 1); + RawStmt *raw_parsetree = linitial_node(RawStmt, raw_parsetree_list); + +#if PG_VERSION_NUM >= 150000 + List *query_list = + PostgresFunctionGuard(pg_analyze_and_rewrite_fixedparams, raw_parsetree, table_scan_query, nullptr, 0, nullptr); +#else + List *query_list = + PostgresFunctionGuard(pg_analyze_and_rewrite, raw_parsetree, table_scan_query, nullptr, 0, nullptr); +#endif + + Assert(list_length(query_list) == 1); + Query *query = linitial_node(Query, query_list); + + Assert(list_length(query->rtable) == 1); + RangeTblEntry *rte = linitial_node(RangeTblEntry, query->rtable); + + char persistence = get_rel_persistence(rte->relid); + + PlannedStmt *planned_stmt = PostgresFunctionGuard(standard_planner, query, table_scan_query, 0, nullptr); + + table_scan_query_desc = PostgresFunctionGuard(CreateQueryDesc, planned_stmt, table_scan_query, GetActiveSnapshot(), + InvalidSnapshot, None_Receiver, nullptr, nullptr, 0); + + PostgresFunctionGuard(ExecutorStart, table_scan_query_desc, 0); + + table_scan_planstate = + PostgresFunctionGuard(ExecInitNode, planned_stmt->planTree, table_scan_query_desc->estate, 0); + + bool run_scan_with_parallel_workers = persistence != RELPERSISTENCE_TEMP; + run_scan_with_parallel_workers &= CanTableScanRunInParallel(table_scan_query_desc->planstate->plan); + + /* Temp tables can be excuted with parallel workers, and whole plan should be parallel aware */ + if (run_scan_with_parallel_workers) { + + int parallel_workers = 0; + if (count_tuples_only) { + /* For count_tuples_only we will try to execute aggregate node on table scan */ + planned_stmt->planTree->parallel_aware = true; + MarkPlanParallelAware((Plan *)table_scan_query_desc->planstate->plan->lefttree); + parallel_workers = ParallelWorkerNumber(planned_stmt->planTree->lefttree->plan_rows); + } else { + MarkPlanParallelAware(table_scan_query_desc->planstate->plan); + parallel_workers = ParallelWorkerNumber(planned_stmt->planTree->plan_rows); + } + + bool interrupts_can_be_process = INTERRUPTS_CAN_BE_PROCESSED(); + + if (!interrupts_can_be_process) { + RESUME_CANCEL_INTERRUPTS(); + } + + if (!IsInParallelMode()) { + EnterParallelMode(); + entered_parallel_mode = true; + } + + ParallelContext *pcxt; + parallel_executor_info = PostgresFunctionGuard(ExecInitParallelPlan, table_scan_planstate, + table_scan_query_desc->estate, nullptr, parallel_workers, -1); + pcxt = parallel_executor_info->pcxt; + PostgresFunctionGuard(LaunchParallelWorkers, pcxt); + nworkers_launched = pcxt->nworkers_launched; + + if (pcxt->nworkers_launched > 0) { + PostgresFunctionGuard(ExecParallelCreateReaders, parallel_executor_info); + nreaders = pcxt->nworkers_launched; + parallel_worker_readers = (void **)palloc(nreaders * sizeof(TupleQueueReader *)); + memcpy(parallel_worker_readers, parallel_executor_info->reader, nreaders * sizeof(TupleQueueReader *)); + } + + if (!interrupts_can_be_process) { + HOLD_CANCEL_INTERRUPTS(); + } + } + + elog(DEBUG1, "(PGDuckdDB/PostgresTableReader)\n\nQUERY: %s\nRUNNING: %s.\nEXECUTING: \n%s", table_scan_query, + !nreaders ? "IN PROCESS THREAD" : psprintf("ON %d PARALLEL WORKER(S)", nreaders), + ExplainScanPlan(table_scan_query_desc)); + + slot = PostgresFunctionGuard(ExecInitExtraTupleSlot, table_scan_query_desc->estate, + table_scan_planstate->ps_ResultTupleDesc, &TTSOpsMinimalTuple); +} + +PostgresTableReader::~PostgresTableReader() { + if (cleaned_up) { + return; + } + std::lock_guard lock(GlobalProcessLock::GetLock()); + PostgresTableReaderCleanup(); +} + +void +PostgresTableReader::PostgresTableReaderCleanup() { + D_ASSERT(!cleaned_up); + cleaned_up = true; + PostgresScopedStackReset scoped_stack_reset; + + if (table_scan_planstate) { + PostgresFunctionGuard(ExecEndNode, table_scan_planstate); + table_scan_planstate = nullptr; + } + + if (parallel_executor_info != NULL) { + PostgresFunctionGuard(ExecParallelFinish, parallel_executor_info); + PostgresFunctionGuard(ExecParallelCleanup, parallel_executor_info); + parallel_executor_info = nullptr; + } + + if (parallel_worker_readers) { + PostgresFunctionGuard(pfree, parallel_worker_readers); + parallel_worker_readers = nullptr; + } + + if (table_scan_query_desc) { + PostgresFunctionGuard(ExecutorFinish, table_scan_query_desc); + PostgresFunctionGuard(ExecutorEnd, table_scan_query_desc); + PostgresFunctionGuard(FreeQueryDesc, table_scan_query_desc); + table_scan_query_desc = nullptr; + } + + if (entered_parallel_mode) { + ExitParallelMode(); + entered_parallel_mode = false; + } +} + +/* + * Logic is straightforward, if `duckdb_max_workers_per_postgres_scan` is set to 0 we don't want any + * parallelization. For cardinality less equal than 2^16 we only try to run one parallel process. When cardinality + * is bigger than we should spawn numer of parallel processes set by `duckdb_max_workers_per_postgres_scan` but + * not bigger than `max_parallel_workers`. + */ + +int +PostgresTableReader::ParallelWorkerNumber(Cardinality cardinality) { + static const int cardinality_threshold = 1 << 16; + /* No parallel worker scan wanted */ + if (!duckdb_max_workers_per_postgres_scan) { + return 0; + } + /* Use only one worker when scan is done on low cardinality */ + if (cardinality <= cardinality_threshold) { + return 1; + } + return std::min(duckdb_max_workers_per_postgres_scan, max_parallel_workers); +} + +const char * +ExplainScanPlan_Unsafe(QueryDesc *query_desc) { + ExplainState *es = (ExplainState *)palloc0(sizeof(ExplainState)); + es->str = makeStringInfo(); + es->format = EXPLAIN_FORMAT_TEXT; + ExplainPrintPlan(es, query_desc); + return es->str->data; +} + +const char * +PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) { + return PostgresFunctionGuard(ExplainScanPlan_Unsafe, query_desc); +} + +bool +PostgresTableReader::CanTableScanRunInParallel(Plan *plan) { + switch (nodeTag(plan)) { + case T_SeqScan: + case T_IndexScan: + case T_IndexOnlyScan: + case T_BitmapHeapScan: + return true; + case T_Append: { + ListCell *l; + foreach (l, ((Append *)plan)->appendplans) { + if (!CanTableScanRunInParallel((Plan *)lfirst(l))) { + return false; + } + } + return true; + } + /* This is special case for COUNT(*) */ + case T_Agg: + return true; + default: + return false; + } +} + +bool +PostgresTableReader::MarkPlanParallelAware(Plan *plan) { + switch (nodeTag(plan)) { + case T_SeqScan: + case T_IndexScan: + case T_IndexOnlyScan: { + plan->parallel_aware = true; + return true; + } + case T_Append: { + plan->parallel_aware = true; + return true; + } + case T_BitmapHeapScan: { + plan->parallel_aware = true; + return MarkPlanParallelAware(plan->lefttree); + } + case T_BitmapIndexScan: { + ((BitmapIndexScan *)plan)->isshared = true; + return true; + } + case T_BitmapAnd: { + return MarkPlanParallelAware((Plan *)linitial(((BitmapAnd *)plan)->bitmapplans)); + } + case T_BitmapOr: { + ((BitmapOr *)plan)->isshared = true; + return MarkPlanParallelAware((Plan *)linitial(((BitmapOr *)plan)->bitmapplans)); + } + default: { + std::ostringstream oss; + oss << "Unknown postgres scan query plan node: " << nodeTag(plan); + throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, oss.str().c_str()); + } + } +} + +/* + * GlobalProcessLock should be held before calling this. + */ +TupleTableSlot * +PostgresTableReader::GetNextTuple() { + MinimalTuple worker_minmal_tuple; + TupleTableSlot *thread_scan_slot; + if (nreaders > 0) { + worker_minmal_tuple = GetNextWorkerTuple(); + if (HeapTupleIsValid(worker_minmal_tuple)) { + PostgresFunctionGuard(ExecStoreMinimalTuple, worker_minmal_tuple, slot, false); + return slot; + } + } else { + PostgresScopedStackReset scoped_stack_reset; + table_scan_query_desc->estate->es_query_dsa = parallel_executor_info ? parallel_executor_info->area : NULL; + thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate); + table_scan_query_desc->estate->es_query_dsa = NULL; + if (!TupIsNull(thread_scan_slot)) { + return thread_scan_slot; + } + } + + return PostgresFunctionGuard(ExecClearTuple, slot); +} + +MinimalTuple +PostgresTableReader::GetNextWorkerTuple() { + int nvisited = 0; + for (;;) { + TupleQueueReader *reader; + MinimalTuple minimal_tuple; + bool readerdone; + + reader = (TupleQueueReader *)parallel_worker_readers[next_parallel_reader]; + + minimal_tuple = PostgresFunctionGuard(TupleQueueReaderNext, reader, true, &readerdone); + + if (readerdone) { + --nreaders; + if (nreaders == 0) { + return NULL; + } + memmove(¶llel_worker_readers[next_parallel_reader], ¶llel_worker_readers[next_parallel_reader + 1], + sizeof(TupleQueueReader *) * (nreaders - next_parallel_reader)); + if (next_parallel_reader >= nreaders) { + next_parallel_reader = 0; + } + continue; + } + + if (minimal_tuple) { + return minimal_tuple; + } + + next_parallel_reader++; + if (next_parallel_reader >= nreaders) + next_parallel_reader = 0; + + nvisited++; + if (nvisited >= nreaders) { + /* + * It should be safe to make this call because function calling GetNextTuple() and transitively + * GetNextWorkerTuple() should held GlobalProcesLock. + */ + PostgresFunctionGuard(WaitLatch, MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, PG_WAIT_EXTENSION); + /* No need to use PostgresFunctionGuard here, because ResetLatch is a trivial function */ + ResetLatch(MyLatch); + nvisited = 0; + } + } +} + +} // namespace pgduckdb diff --git a/test/pycheck/explain_test.py b/test/pycheck/explain_test.py index e84e440b..b9347e2b 100644 --- a/test/pycheck/explain_test.py +++ b/test/pycheck/explain_test.py @@ -59,21 +59,21 @@ def test_explain_ctas(cur: Cursor): cur.sql("CREATE TEMP TABLE heap1(id) AS SELECT 1") result = cur.sql("EXPLAIN CREATE TEMP TABLE heap2(id) AS SELECT * from heap1") plan = "\n".join(result) - assert "POSTGRES_SEQ_SCAN" in plan + assert "POSTGRES_SCAN" in plan assert "Total Time:" not in plan result = cur.sql( "EXPLAIN ANALYZE CREATE TEMP TABLE heap2(id) AS SELECT * from heap1" ) plan = "\n".join(result) - assert "POSTGRES_SEQ_SCAN" in plan + assert "POSTGRES_SCAN" in plan assert "Total Time:" in plan result = cur.sql( "EXPLAIN CREATE TEMP TABLE duckdb1(id) USING duckdb AS SELECT * from heap1" ) plan = "\n".join(result) - assert "POSTGRES_SEQ_SCAN" in plan + assert "POSTGRES_SCAN" in plan assert "Total Time:" not in plan # EXPLAIN ANALYZE is not supported for DuckDB CTAS (yet) diff --git a/test/regression/expected/basic.out b/test/regression/expected/basic.out index 85e3391f..40e859a6 100644 --- a/test/regression/expected/basic.out +++ b/test/regression/expected/basic.out @@ -15,22 +15,6 @@ SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; 9 | 100 (4 rows) -SET duckdb.max_threads_per_postgres_scan to 4; -SELECT COUNT(*) FROM t; - count -------- - 1000 -(1 row) - -SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; - a | count ----+------- - 6 | 100 - 7 | 100 - 8 | 100 - 9 | 100 -(4 rows) - CREATE TABLE empty(a INT); SELECT COUNT(*) FROM empty; count @@ -38,7 +22,6 @@ SELECT COUNT(*) FROM empty; 0 (1 row) -SET duckdb.max_threads_per_postgres_scan TO default; SET client_min_messages TO default; DROP TABLE t; DROP TABLE empty; diff --git a/test/regression/expected/duckdb_recycle.out b/test/regression/expected/duckdb_recycle.out index fb9628a6..723cf523 100644 --- a/test/regression/expected/duckdb_recycle.out +++ b/test/regression/expected/duckdb_recycle.out @@ -13,16 +13,15 @@ EXPLAIN SELECT count(*) FROM ta; │ count_star() │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ - │ POSTGRES_SEQ_SCAN │ + │ POSTGRES_SCAN │ │ ──────────────────── │ - │ Function: │ - │ POSTGRES_SEQ_SCAN │ + │ (POSTGRES_SCAN) ta │ │ │ │ ~2550 Rows │ └───────────────────────────┘ -(19 rows) +(18 rows) CALL duckdb.recycle_ddb(); EXPLAIN SELECT count(*) FROM ta; @@ -38,16 +37,15 @@ EXPLAIN SELECT count(*) FROM ta; │ count_star() │ └─────────────┬─────────────┘ ┌─────────────┴─────────────┐ - │ POSTGRES_SEQ_SCAN │ + │ POSTGRES_SCAN │ │ ──────────────────── │ - │ Function: │ - │ POSTGRES_SEQ_SCAN │ + │ (POSTGRES_SCAN) ta │ │ │ │ ~2550 Rows │ └───────────────────────────┘ -(19 rows) +(18 rows) -- Not allowed in a transaction BEGIN; diff --git a/test/regression/expected/scan_postgres_tables.out b/test/regression/expected/scan_postgres_tables.out new file mode 100644 index 00000000..b6cca102 --- /dev/null +++ b/test/regression/expected/scan_postgres_tables.out @@ -0,0 +1,255 @@ +CREATE TABLE t1(a INT, b INT, c TEXT); +INSERT INTO t1 SELECT g, g % 100, 'scan_potgres_table_'||g from generate_series(1,100000) g; +SET client_min_messages TO DEBUG1; +-- COUNT(*) +SELECT COUNT(*) FROM t1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT COUNT(*) FROM public.t1 +RUNNING: ON 2 PARALLEL WORKER(S). +EXECUTING: +Parallel Aggregate + -> Parallel Seq Scan on t1 + + count +-------- + 100000 +(1 row) + +-- SEQ SCAN +SELECT COUNT(a) FROM t1 WHERE a < 10; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a < 10)) + + count +------- + 9 +(1 row) + +-- CREATE INDEX on t1 +SET client_min_messages TO DEFAULT; +CREATE INDEX ON t1(a); +SET client_min_messages TO DEBUG1; +-- BITMAP INDEX +SELECT COUNT(a) FROM t1 WHERE a < 10; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Bitmap Heap Scan on t1 + Recheck Cond: ((a < 10) AND (a IS NOT NULL)) + -> Bitmap Index Scan on t1_a_idx + Index Cond: ((a < 10) AND (a IS NOT NULL)) + + count +------- + 9 +(1 row) + +-- INDEXONLYSCAN +SET enable_bitmapscan TO false; +SELECT COUNT(a) FROM t1 WHERE a = 1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a=1 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Only Scan using t1_a_idx on t1 + Index Cond: ((a IS NOT NULL) AND (a = 1)) + + count +------- + 1 +(1 row) + +-- INDEXSCAN +SELECT COUNT(c) FROM t1 WHERE a = 1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT c FROM public.t1 WHERE (a=1 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Scan using t1_a_idx on t1 + Index Cond: ((a IS NOT NULL) AND (a = 1)) + + count +------- + 1 +(1 row) + +-- TEMPORARY TABLES JOIN WITH HEAP TABLES +SET client_min_messages TO DEFAULT; +CREATE TEMP TABLE t2(a int); +INSERT INTO t2 VALUES (1), (2), (3); +SET client_min_messages TO DEBUG1; +SELECT t1.a, t2.a FROM t1, t2 WHERE t1.a = t2.a; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM pg_temp.t2 +RUNNING: IN PROCESS THREAD. +EXECUTING: +Seq Scan on t2 + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a>=1 AND a<=3 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Only Scan using t1_a_idx on t1 + Index Cond: ((a >= 1) AND (a <= 3) AND (a IS NOT NULL)) + + a | a +---+--- + 1 | 1 + 2 | 2 + 3 | 3 +(3 rows) + +-- JOIN WITH SAME TABLE (on WORKERS) +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a < 2)) + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a > 8)) + + count +------- + 99992 +(1 row) + +-- JOIN WITH SAME TABLE (in BACKEND PROCESS) +SET max_parallel_workers TO 0; +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL) +RUNNING: IN PROCESS THREAD. +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a < 2)) + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL) +RUNNING: IN PROCESS THREAD. +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a > 8)) + + count +------- + 99992 +(1 row) + +SET max_parallel_workers TO DEFAULT; +-- PARTITIONED TABLE +SET client_min_messages TO DEFAULT; +CREATE TABLE partitioned_table(a int, b INT, c text) PARTITION BY RANGE (a); +CREATE TABLE partition_1 PARTITION OF partitioned_table FOR VALUES FROM (0) TO (50); +CREATE TABLE partition_2 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (100); +INSERT INTO partitioned_table SELECT g % 100, g, 'abcde_'||g from generate_series(1,100000) g; +CREATE INDEX ON partitioned_table(b); +SET client_min_messages TO DEBUG1; +SELECT COUNT(*) FROM partitioned_table WHERE a < 25; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.partitioned_table WHERE (a<25 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on partition_1 partitioned_table + Filter: ((a IS NOT NULL) AND (a < 25)) + + count +------- + 25000 +(1 row) + +SELECT COUNT(*) FROM partitioned_table WHERE a < 75; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.partitioned_table WHERE (a<75 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Append + -> Seq Scan on partition_1 partitioned_table_1 + Filter: ((a IS NOT NULL) AND (a < 75)) + -> Seq Scan on partition_2 partitioned_table_2 + Filter: ((a IS NOT NULL) AND (a < 75)) + + count +------- + 75000 +(1 row) + +SELECT COUNT(*) FROM partitioned_table WHERE a < 25 OR a > 75; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.partitioned_table +RUNNING: ON 2 PARALLEL WORKER(S). +EXECUTING: +Parallel Append + -> Seq Scan on partition_1 partitioned_table_1 + -> Seq Scan on partition_2 partitioned_table_2 + + count +------- + 49000 +(1 row) + +SELECT COUNT(*) FROM partitioned_table WHERE a < 25 AND b = 1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a, b FROM public.partitioned_table WHERE (a<25 AND a IS NOT NULL) AND (b=1 AND b IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Scan using partition_1_b_idx on partition_1 partitioned_table + Index Cond: ((b IS NOT NULL) AND (b = 1)) + Filter: ((a IS NOT NULL) AND (a < 25)) + + count +------- + 1 +(1 row) + +SELECT COUNT(*) FROM partitioned_table, t2 WHERE partitioned_table.a = t2.a AND partitioned_table.a < 2; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.partitioned_table WHERE (a<2 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on partition_1 partitioned_table + Filter: ((a IS NOT NULL) AND (a < 2)) + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM pg_temp.t2 WHERE (a<2 AND a IS NOT NULL AND a>=0 AND a<=1 AND a IS NOT NULL) +RUNNING: IN PROCESS THREAD. +EXECUTING: +Seq Scan on t2 + Filter: ((a IS NOT NULL) AND (a IS NOT NULL) AND (a < 2) AND (a >= 0) AND (a <= 1)) + + count +------- + 1000 +(1 row) + +SET enable_bitmapscan TO DEFAULT; +SET client_min_messages TO DEFAULT; +DROP TABLE t1, t2, partitioned_table; diff --git a/test/regression/schedule b/test/regression/schedule index eec4208b..18d43899 100644 --- a/test/regression/schedule +++ b/test/regression/schedule @@ -28,3 +28,4 @@ test: prepare test: function test: timestamp_with_interval test: approx_count_distinct +test: scan_postgres_tables diff --git a/test/regression/sql/basic.sql b/test/regression/sql/basic.sql index 614773cd..aa69d83e 100644 --- a/test/regression/sql/basic.sql +++ b/test/regression/sql/basic.sql @@ -5,15 +5,9 @@ INSERT INTO t SELECT g % 10 from generate_series(1,1000) g; SELECT COUNT(*) FROM t; SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; -SET duckdb.max_threads_per_postgres_scan to 4; - -SELECT COUNT(*) FROM t; -SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; - CREATE TABLE empty(a INT); SELECT COUNT(*) FROM empty; -SET duckdb.max_threads_per_postgres_scan TO default; SET client_min_messages TO default; DROP TABLE t; diff --git a/test/regression/sql/scan_postgres_tables.sql b/test/regression/sql/scan_postgres_tables.sql new file mode 100644 index 00000000..efff9649 --- /dev/null +++ b/test/regression/sql/scan_postgres_tables.sql @@ -0,0 +1,63 @@ +CREATE TABLE t1(a INT, b INT, c TEXT); +INSERT INTO t1 SELECT g, g % 100, 'scan_potgres_table_'||g from generate_series(1,100000) g; + +SET client_min_messages TO DEBUG1; + +-- COUNT(*) +SELECT COUNT(*) FROM t1; + +-- SEQ SCAN +SELECT COUNT(a) FROM t1 WHERE a < 10; + +-- CREATE INDEX on t1 +SET client_min_messages TO DEFAULT; +CREATE INDEX ON t1(a); +SET client_min_messages TO DEBUG1; + +-- BITMAP INDEX +SELECT COUNT(a) FROM t1 WHERE a < 10; + +-- INDEXONLYSCAN +SET enable_bitmapscan TO false; +SELECT COUNT(a) FROM t1 WHERE a = 1; + +-- INDEXSCAN +SELECT COUNT(c) FROM t1 WHERE a = 1; + +-- TEMPORARY TABLES JOIN WITH HEAP TABLES +SET client_min_messages TO DEFAULT; +CREATE TEMP TABLE t2(a int); +INSERT INTO t2 VALUES (1), (2), (3); +SET client_min_messages TO DEBUG1; + +SELECT t1.a, t2.a FROM t1, t2 WHERE t1.a = t2.a; + +-- JOIN WITH SAME TABLE (on WORKERS) +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; + +-- JOIN WITH SAME TABLE (in BACKEND PROCESS) +SET max_parallel_workers TO 0; +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; +SET max_parallel_workers TO DEFAULT; + + +-- PARTITIONED TABLE + +SET client_min_messages TO DEFAULT; +CREATE TABLE partitioned_table(a int, b INT, c text) PARTITION BY RANGE (a); +CREATE TABLE partition_1 PARTITION OF partitioned_table FOR VALUES FROM (0) TO (50); +CREATE TABLE partition_2 PARTITION OF partitioned_table FOR VALUES FROM (50) TO (100); +INSERT INTO partitioned_table SELECT g % 100, g, 'abcde_'||g from generate_series(1,100000) g; +CREATE INDEX ON partitioned_table(b); +SET client_min_messages TO DEBUG1; + +SELECT COUNT(*) FROM partitioned_table WHERE a < 25; +SELECT COUNT(*) FROM partitioned_table WHERE a < 75; +SELECT COUNT(*) FROM partitioned_table WHERE a < 25 OR a > 75; +SELECT COUNT(*) FROM partitioned_table WHERE a < 25 AND b = 1; +SELECT COUNT(*) FROM partitioned_table, t2 WHERE partitioned_table.a = t2.a AND partitioned_table.a < 2; + + +SET enable_bitmapscan TO DEFAULT; +SET client_min_messages TO DEFAULT; +DROP TABLE t1, t2, partitioned_table;