From de068c621bf09604b7b353c819d1fbf6a384648c Mon Sep 17 00:00:00 2001 From: mkaruza Date: Sat, 14 Dec 2024 11:27:10 +0100 Subject: [PATCH] Regression test for native postgres scan. Handle COUNT(*) in batches. --- .../pgduckdb/scan/postgres_table_reader.hpp | 5 +- src/scan/postgres_scan.cpp | 30 ++-- src/scan/postgres_table_reader.cpp | 65 ++++++- test/regression/expected/basic.out | 17 -- .../expected/scan_postgres_tables.out | 164 ++++++++++++++++++ test/regression/sql/basic.sql | 6 - test/regression/sql/scan_postgres_tables.sql | 46 +++++ 7 files changed, 290 insertions(+), 43 deletions(-) create mode 100644 test/regression/expected/scan_postgres_tables.out create mode 100644 test/regression/sql/scan_postgres_tables.sql diff --git a/include/pgduckdb/scan/postgres_table_reader.hpp b/include/pgduckdb/scan/postgres_table_reader.hpp index a142ab2b..cbd009b2 100644 --- a/include/pgduckdb/scan/postgres_table_reader.hpp +++ b/include/pgduckdb/scan/postgres_table_reader.hpp @@ -12,14 +12,15 @@ namespace pgduckdb { class PostgresTableReader { public: - PostgresTableReader(const char *table_scan_query); + PostgresTableReader(const char *table_scan_query, bool count_tuples_only); ~PostgresTableReader(); TupleTableSlot *GetNextTuple(); private: MinimalTuple GetNextWorkerTuple(); int ParallelWorkerNumber(Cardinality cardinality); - + std::string ExplainScanPlan(QueryDesc *query_desc); + bool MarkPlanParallelAware(Plan *plan); private: QueryDesc *table_scan_query_desc; PlanState *table_scan_planstate; diff --git a/src/scan/postgres_scan.cpp b/src/scan/postgres_scan.cpp index 3af7df43..ee7a58bf 100644 --- a/src/scan/postgres_scan.cpp +++ b/src/scan/postgres_scan.cpp @@ -108,10 +108,6 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput scan_query << ") "; } } - - scan_query << ";"; - - pd_log(DEBUG1, "scan_query: %s", scan_query.str().c_str()); } std::string @@ -124,7 +120,8 @@ PostgresScanGlobalState::PostgresScanGlobalState(Snapshot snapshot, Relation rel : snapshot(snapshot), rel(rel), table_tuple_desc(RelationGetDescr(rel)), count_tuples_only(false), total_row_count(0) { ConstructTableScanQuery(input); - table_reader_global_state = duckdb::make_shared_ptr(scan_query.str().c_str()); + table_reader_global_state = + duckdb::make_shared_ptr(scan_query.str().c_str(), count_tuples_only); pd_log(DEBUG2, "(DuckDB/PostgresSeqScanGlobalState) Running %" PRIu64 " threads -- ", (uint64_t)MaxThreads()); } @@ -183,20 +180,27 @@ PostgresScanTableFunction::PostgresScanInitLocal(__attribute__((unused)) duckdb: auto global_state = reinterpret_cast(gstate); return duckdb::make_uniq(global_state); } +void +SetOutputCardinality(duckdb::DataChunk &output, PostgresScanLocalState &local_state) { + idx_t output_cardinality = + local_state.output_vector_size <= STANDARD_VECTOR_SIZE ? local_state.output_vector_size : STANDARD_VECTOR_SIZE; + output.SetCardinality(output_cardinality); + local_state.output_vector_size -= output_cardinality; +} void PostgresScanTableFunction::PostgresScanFunction(__attribute__((unused)) duckdb::ClientContext &context, duckdb::TableFunctionInput &data, duckdb::DataChunk &output) { auto &local_state = data.local_state->Cast(); - local_state.output_vector_size = 0; - - /* We have exhausted seq scan of heap table so we can return */ + /* We have exhausted table scan */ if (local_state.exhausted_scan) { - output.SetCardinality(0); + SetOutputCardinality(output, local_state); return; } + local_state.output_vector_size = 0; + int i = 0; for (; i < STANDARD_VECTOR_SIZE; i++) { TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple(); @@ -208,16 +212,12 @@ PostgresScanTableFunction::PostgresScanFunction(__attribute__((unused)) duckdb:: InsertTupleIntoChunk(output, local_state, slot); } + /* If we finish before reading complete vector means that scan was exhausted. */ if (i != STANDARD_VECTOR_SIZE) { local_state.exhausted_scan = true; } - /* Special case when we only use COUNT(*). We only update output capactity. */ - if (local_state.global_state->count_tuples_only && local_state.output_vector_size < STANDARD_VECTOR_SIZE) { - output.SetCapacity(local_state.output_vector_size); - } - - output.SetCardinality(local_state.output_vector_size); + SetOutputCardinality(output, local_state); } duckdb::unique_ptr diff --git a/src/scan/postgres_table_reader.cpp b/src/scan/postgres_table_reader.cpp index 3a42db33..e56610e8 100644 --- a/src/scan/postgres_table_reader.cpp +++ b/src/scan/postgres_table_reader.cpp @@ -7,6 +7,7 @@ extern "C" { #include "postgres.h" #include "miscadmin.h" #include "access/xact.h" +#include "commands/explain.h" #include "executor/executor.h" #include "executor/execParallel.h" #include "executor/tqueue.h" @@ -24,7 +25,7 @@ extern "C" { namespace pgduckdb { -PostgresTableReader::PostgresTableReader(const char *table_scan_query) +PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool count_tuples_only) : parallel_executor_info(nullptr), parallel_worker_readers(nullptr), nreaders(0), next_parallel_reader(0), entered_parallel_mode(false) { std::lock_guard lock(GlobalProcessLock::GetLock()); @@ -55,10 +56,20 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query) table_scan_planstate = PostgresFunctionGuard(ExecInitNode, planned_stmt->planTree, table_scan_query_desc->estate, 0); - /* Temp tables can be excuted with parallel workers */ + bool marked_parallel_aware = false; + if (persistence != RELPERSISTENCE_TEMP) { + if (count_tuples_only) { + /* For count_tuples_only we will try to execute aggregate node on table scan */ + planned_stmt->planTree->parallel_aware = true; + marked_parallel_aware = MarkPlanParallelAware((Plan *)table_scan_query_desc->planstate->plan->lefttree); + } else { + marked_parallel_aware = MarkPlanParallelAware(table_scan_query_desc->planstate->plan); + } + } - table_scan_query_desc->planstate->plan->parallel_aware = true; + /* Temp tables can be excuted with parallel workers, and whole plan should be parallel aware */ + if (persistence != RELPERSISTENCE_TEMP && marked_parallel_aware) { int parallel_workers = ParallelWorkerNumber(planned_stmt->planTree->plan_rows); @@ -86,6 +97,10 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query) HOLD_CANCEL_INTERRUPTS(); } + elog(DEBUG1, "(PGDuckdDB/PostgresTableReader)\n\nQUERY: %s\nRUNNING: %s.\nEXECUTING: \n%s", table_scan_query, + !nreaders ? "IN PROCESS THREAD" : psprintf("ON %d PARALLEL WORKER(S)", nreaders), + ExplainScanPlan(table_scan_query_desc).c_str()); + slot = PostgresFunctionGuard(ExecInitExtraTupleSlot, table_scan_query_desc->estate, table_scan_planstate->ps_ResultTupleDesc, &TTSOpsMinimalTuple); } @@ -127,6 +142,50 @@ PostgresTableReader::ParallelWorkerNumber(Cardinality cardinality) { return std::max(1, std::min(base, std::max(max_workers_per_postgres_scan, max_parallel_workers))); } +std::string +PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) { + ExplainState *es = (ExplainState *)PostgresFunctionGuard(palloc0, sizeof(ExplainState)); + es->str = makeStringInfo(); + es->format = EXPLAIN_FORMAT_TEXT; + PostgresFunctionGuard(ExplainPrintPlan, es, query_desc); + // // Remove new line char from explain output + // es->str->data[es->str->len - 1] = 0; + std::string explain_scan(es->str->data); + return explain_scan; +} + +bool +PostgresTableReader::MarkPlanParallelAware(Plan *plan) { + switch (nodeTag(plan)) { + case T_SeqScan: + case T_IndexScan: + case T_IndexOnlyScan: { + plan->parallel_aware = true; + return true; + } + case T_BitmapHeapScan: { + plan->parallel_aware = true; + return MarkPlanParallelAware(plan->lefttree); + } + case T_BitmapIndexScan: { + ((BitmapIndexScan *)plan)->isshared = true; + return true; + } + case T_BitmapAnd: { + return MarkPlanParallelAware((Plan *)linitial(((BitmapAnd *)plan)->bitmapplans)); + } + case T_BitmapOr: { + ((BitmapOr *)plan)->isshared = true; + return MarkPlanParallelAware((Plan *)linitial(((BitmapOr *)plan)->bitmapplans)); + } + default: { + std::ostringstream oss; + oss << "Unknown postgres scan query plan node: " << nodeTag(plan); + throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, oss.str().c_str()); + } + } +} + TupleTableSlot * PostgresTableReader::GetNextTuple() { MinimalTuple worker_minmal_tuple; diff --git a/test/regression/expected/basic.out b/test/regression/expected/basic.out index 85e3391f..40e859a6 100644 --- a/test/regression/expected/basic.out +++ b/test/regression/expected/basic.out @@ -15,22 +15,6 @@ SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; 9 | 100 (4 rows) -SET duckdb.max_threads_per_postgres_scan to 4; -SELECT COUNT(*) FROM t; - count -------- - 1000 -(1 row) - -SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; - a | count ----+------- - 6 | 100 - 7 | 100 - 8 | 100 - 9 | 100 -(4 rows) - CREATE TABLE empty(a INT); SELECT COUNT(*) FROM empty; count @@ -38,7 +22,6 @@ SELECT COUNT(*) FROM empty; 0 (1 row) -SET duckdb.max_threads_per_postgres_scan TO default; SET client_min_messages TO default; DROP TABLE t; DROP TABLE empty; diff --git a/test/regression/expected/scan_postgres_tables.out b/test/regression/expected/scan_postgres_tables.out new file mode 100644 index 00000000..d98097c5 --- /dev/null +++ b/test/regression/expected/scan_postgres_tables.out @@ -0,0 +1,164 @@ +CREATE TABLE t1(a INT, b INT, c TEXT); +INSERT INTO t1 SELECT g, g % 100, 'scan_potgres_table_'||g from generate_series(1,100000) g; +SET client_min_messages TO DEBUG1; +-- COUNT(*) +SELECT COUNT(*) FROM t1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT COUNT(*) FROM public.t1 +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Aggregate + -> Parallel Seq Scan on t1 + + count +-------- + 100000 +(1 row) + +-- SEQ SCAN +SELECT COUNT(a) FROM t1 WHERE a < 10; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a < 10)) + + count +------- + 9 +(1 row) + +-- CREATE INDEX on t1 +SET client_min_messages TO DEFAULT; +CREATE INDEX ON t1(a); +SET client_min_messages TO DEBUG1; +-- BITMAP INDEX +SELECT COUNT(a) FROM t1 WHERE a < 10; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Bitmap Heap Scan on t1 + Recheck Cond: ((a < 10) AND (a IS NOT NULL)) + -> Bitmap Index Scan on t1_a_idx + Index Cond: ((a < 10) AND (a IS NOT NULL)) + + count +------- + 9 +(1 row) + +-- INDEXONLYSCAN +SET enable_bitmapscan TO false; +SELECT COUNT(a) FROM t1 WHERE a = 1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a=1 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Only Scan using t1_a_idx on t1 + Index Cond: ((a IS NOT NULL) AND (a = 1)) + + count +------- + 1 +(1 row) + +-- INDEXSCAN +SELECT COUNT(c) FROM t1 WHERE a = 1; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT c FROM public.t1 WHERE (a=1 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Scan using t1_a_idx on t1 + Index Cond: ((a IS NOT NULL) AND (a = 1)) + + count +------- + 1 +(1 row) + +-- TEMPORARY TABLES JOIN WITH HEAP TABLES +SET client_min_messages TO DEFAULT; +CREATE TEMP TABLE t2(a int); +INSERT INTO t2 VALUES (1), (2), (3); +SET client_min_messages TO DEBUG1; +SELECT t1.a, t2.a FROM t1, t2 WHERE t1.a = t2.a; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM pg_temp.t2 +RUNNING: IN PROCESS THREAD. +EXECUTING: +Seq Scan on t2 + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a>=1 AND a<=3 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Index Only Scan using t1_a_idx on t1 + Index Cond: ((a >= 1) AND (a <= 3) AND (a IS NOT NULL)) + + a | a +---+--- + 1 | 1 + 2 | 2 + 3 | 3 +(3 rows) + +-- JOIN WITH SAME TABLE (on WORKERS) +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a < 2)) + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL) +RUNNING: ON 1 PARALLEL WORKER(S). +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a > 8)) + + count +------- + 99992 +(1 row) + +-- JOIN WITH SAME TABLE (in BACKEND PROCESS) +SET max_parallel_workers TO 0; +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL) +RUNNING: IN PROCESS THREAD. +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a < 2)) + +DEBUG: (PGDuckdDB/PostgresTableReader) + +QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL) +RUNNING: IN PROCESS THREAD. +EXECUTING: +Parallel Seq Scan on t1 + Filter: ((a IS NOT NULL) AND (a > 8)) + + count +------- + 99992 +(1 row) + +SET max_parallel_workers TO DEFAULT; +SET enable_bitmapscan TO DEFAULT; +SET client_min_messages TO DEFAULT; +DROP TABLE t1, t2; diff --git a/test/regression/sql/basic.sql b/test/regression/sql/basic.sql index 614773cd..aa69d83e 100644 --- a/test/regression/sql/basic.sql +++ b/test/regression/sql/basic.sql @@ -5,15 +5,9 @@ INSERT INTO t SELECT g % 10 from generate_series(1,1000) g; SELECT COUNT(*) FROM t; SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; -SET duckdb.max_threads_per_postgres_scan to 4; - -SELECT COUNT(*) FROM t; -SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a; - CREATE TABLE empty(a INT); SELECT COUNT(*) FROM empty; -SET duckdb.max_threads_per_postgres_scan TO default; SET client_min_messages TO default; DROP TABLE t; diff --git a/test/regression/sql/scan_postgres_tables.sql b/test/regression/sql/scan_postgres_tables.sql new file mode 100644 index 00000000..faaa1d24 --- /dev/null +++ b/test/regression/sql/scan_postgres_tables.sql @@ -0,0 +1,46 @@ +CREATE TABLE t1(a INT, b INT, c TEXT); +INSERT INTO t1 SELECT g, g % 100, 'scan_potgres_table_'||g from generate_series(1,100000) g; + +SET client_min_messages TO DEBUG1; + +-- COUNT(*) +SELECT COUNT(*) FROM t1; + +-- SEQ SCAN +SELECT COUNT(a) FROM t1 WHERE a < 10; + +-- CREATE INDEX on t1 +SET client_min_messages TO DEFAULT; +CREATE INDEX ON t1(a); +SET client_min_messages TO DEBUG1; + +-- BITMAP INDEX +SELECT COUNT(a) FROM t1 WHERE a < 10; + +-- INDEXONLYSCAN +SET enable_bitmapscan TO false; +SELECT COUNT(a) FROM t1 WHERE a = 1; + +-- INDEXSCAN +SELECT COUNT(c) FROM t1 WHERE a = 1; + +-- TEMPORARY TABLES JOIN WITH HEAP TABLES +SET client_min_messages TO DEFAULT; +CREATE TEMP TABLE t2(a int); +INSERT INTO t2 VALUES (1), (2), (3); +SET client_min_messages TO DEBUG1; + +SELECT t1.a, t2.a FROM t1, t2 WHERE t1.a = t2.a; + +-- JOIN WITH SAME TABLE (on WORKERS) +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; + +-- JOIN WITH SAME TABLE (in BACKEND PROCESS) +SET max_parallel_workers TO 0; +SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8; +SET max_parallel_workers TO DEFAULT; + + +SET enable_bitmapscan TO DEFAULT; +SET client_min_messages TO DEFAULT; +DROP TABLE t1, t2;