Skip to content

Commit

Permalink
Regression test for native postgres scan. Handle COUNT(*) in batches.
Browse files Browse the repository at this point in the history
  • Loading branch information
mkaruza committed Dec 14, 2024
1 parent c25f1be commit de068c6
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 43 deletions.
5 changes: 3 additions & 2 deletions include/pgduckdb/scan/postgres_table_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ namespace pgduckdb {

class PostgresTableReader {
public:
PostgresTableReader(const char *table_scan_query);
PostgresTableReader(const char *table_scan_query, bool count_tuples_only);
~PostgresTableReader();
TupleTableSlot *GetNextTuple();

private:
MinimalTuple GetNextWorkerTuple();
int ParallelWorkerNumber(Cardinality cardinality);

std::string ExplainScanPlan(QueryDesc *query_desc);
bool MarkPlanParallelAware(Plan *plan);
private:
QueryDesc *table_scan_query_desc;
PlanState *table_scan_planstate;
Expand Down
30 changes: 15 additions & 15 deletions src/scan/postgres_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,6 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput
scan_query << ") ";
}
}

scan_query << ";";

pd_log(DEBUG1, "scan_query: %s", scan_query.str().c_str());
}

std::string
Expand All @@ -124,7 +120,8 @@ PostgresScanGlobalState::PostgresScanGlobalState(Snapshot snapshot, Relation rel
: snapshot(snapshot), rel(rel), table_tuple_desc(RelationGetDescr(rel)), count_tuples_only(false),
total_row_count(0) {
ConstructTableScanQuery(input);
table_reader_global_state = duckdb::make_shared_ptr<PostgresTableReader>(scan_query.str().c_str());
table_reader_global_state =
duckdb::make_shared_ptr<PostgresTableReader>(scan_query.str().c_str(), count_tuples_only);
pd_log(DEBUG2, "(DuckDB/PostgresSeqScanGlobalState) Running %" PRIu64 " threads -- ", (uint64_t)MaxThreads());
}

Expand Down Expand Up @@ -183,20 +180,27 @@ PostgresScanTableFunction::PostgresScanInitLocal(__attribute__((unused)) duckdb:
auto global_state = reinterpret_cast<PostgresScanGlobalState *>(gstate);
return duckdb::make_uniq<PostgresScanLocalState>(global_state);
}
void
SetOutputCardinality(duckdb::DataChunk &output, PostgresScanLocalState &local_state) {
idx_t output_cardinality =
local_state.output_vector_size <= STANDARD_VECTOR_SIZE ? local_state.output_vector_size : STANDARD_VECTOR_SIZE;
output.SetCardinality(output_cardinality);
local_state.output_vector_size -= output_cardinality;
}

void
PostgresScanTableFunction::PostgresScanFunction(__attribute__((unused)) duckdb::ClientContext &context,
duckdb::TableFunctionInput &data, duckdb::DataChunk &output) {
auto &local_state = data.local_state->Cast<PostgresScanLocalState>();

local_state.output_vector_size = 0;

/* We have exhausted seq scan of heap table so we can return */
/* We have exhausted table scan */
if (local_state.exhausted_scan) {
output.SetCardinality(0);
SetOutputCardinality(output, local_state);
return;
}

local_state.output_vector_size = 0;

int i = 0;
for (; i < STANDARD_VECTOR_SIZE; i++) {
TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple();
Expand All @@ -208,16 +212,12 @@ PostgresScanTableFunction::PostgresScanFunction(__attribute__((unused)) duckdb::
InsertTupleIntoChunk(output, local_state, slot);
}

/* If we finish before reading complete vector means that scan was exhausted. */
if (i != STANDARD_VECTOR_SIZE) {
local_state.exhausted_scan = true;
}

/* Special case when we only use COUNT(*). We only update output capactity. */
if (local_state.global_state->count_tuples_only && local_state.output_vector_size < STANDARD_VECTOR_SIZE) {
output.SetCapacity(local_state.output_vector_size);
}

output.SetCardinality(local_state.output_vector_size);
SetOutputCardinality(output, local_state);
}

duckdb::unique_ptr<duckdb::NodeStatistics>
Expand Down
65 changes: 62 additions & 3 deletions src/scan/postgres_table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ extern "C" {
#include "postgres.h"
#include "miscadmin.h"
#include "access/xact.h"
#include "commands/explain.h"
#include "executor/executor.h"
#include "executor/execParallel.h"
#include "executor/tqueue.h"
Expand All @@ -24,7 +25,7 @@ extern "C" {

namespace pgduckdb {

PostgresTableReader::PostgresTableReader(const char *table_scan_query)
PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool count_tuples_only)
: parallel_executor_info(nullptr), parallel_worker_readers(nullptr), nreaders(0), next_parallel_reader(0),
entered_parallel_mode(false) {
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());
Expand Down Expand Up @@ -55,10 +56,20 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query)
table_scan_planstate =
PostgresFunctionGuard(ExecInitNode, planned_stmt->planTree, table_scan_query_desc->estate, 0);

/* Temp tables can be excuted with parallel workers */
bool marked_parallel_aware = false;

if (persistence != RELPERSISTENCE_TEMP) {
if (count_tuples_only) {
/* For count_tuples_only we will try to execute aggregate node on table scan */
planned_stmt->planTree->parallel_aware = true;
marked_parallel_aware = MarkPlanParallelAware((Plan *)table_scan_query_desc->planstate->plan->lefttree);
} else {
marked_parallel_aware = MarkPlanParallelAware(table_scan_query_desc->planstate->plan);
}
}

table_scan_query_desc->planstate->plan->parallel_aware = true;
/* Temp tables can be excuted with parallel workers, and whole plan should be parallel aware */
if (persistence != RELPERSISTENCE_TEMP && marked_parallel_aware) {

int parallel_workers = ParallelWorkerNumber(planned_stmt->planTree->plan_rows);

Expand Down Expand Up @@ -86,6 +97,10 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query)
HOLD_CANCEL_INTERRUPTS();
}

elog(DEBUG1, "(PGDuckdDB/PostgresTableReader)\n\nQUERY: %s\nRUNNING: %s.\nEXECUTING: \n%s", table_scan_query,
!nreaders ? "IN PROCESS THREAD" : psprintf("ON %d PARALLEL WORKER(S)", nreaders),
ExplainScanPlan(table_scan_query_desc).c_str());

slot = PostgresFunctionGuard(ExecInitExtraTupleSlot, table_scan_query_desc->estate,
table_scan_planstate->ps_ResultTupleDesc, &TTSOpsMinimalTuple);
}
Expand Down Expand Up @@ -127,6 +142,50 @@ PostgresTableReader::ParallelWorkerNumber(Cardinality cardinality) {
return std::max(1, std::min(base, std::max(max_workers_per_postgres_scan, max_parallel_workers)));
}

std::string
PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) {
ExplainState *es = (ExplainState *)PostgresFunctionGuard(palloc0, sizeof(ExplainState));
es->str = makeStringInfo();
es->format = EXPLAIN_FORMAT_TEXT;
PostgresFunctionGuard(ExplainPrintPlan, es, query_desc);
// // Remove new line char from explain output
// es->str->data[es->str->len - 1] = 0;
std::string explain_scan(es->str->data);
return explain_scan;
}

bool
PostgresTableReader::MarkPlanParallelAware(Plan *plan) {
switch (nodeTag(plan)) {
case T_SeqScan:
case T_IndexScan:
case T_IndexOnlyScan: {
plan->parallel_aware = true;
return true;
}
case T_BitmapHeapScan: {
plan->parallel_aware = true;
return MarkPlanParallelAware(plan->lefttree);
}
case T_BitmapIndexScan: {
((BitmapIndexScan *)plan)->isshared = true;
return true;
}
case T_BitmapAnd: {
return MarkPlanParallelAware((Plan *)linitial(((BitmapAnd *)plan)->bitmapplans));
}
case T_BitmapOr: {
((BitmapOr *)plan)->isshared = true;
return MarkPlanParallelAware((Plan *)linitial(((BitmapOr *)plan)->bitmapplans));
}
default: {
std::ostringstream oss;
oss << "Unknown postgres scan query plan node: " << nodeTag(plan);
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, oss.str().c_str());
}
}
}

TupleTableSlot *
PostgresTableReader::GetNextTuple() {
MinimalTuple worker_minmal_tuple;
Expand Down
17 changes: 0 additions & 17 deletions test/regression/expected/basic.out
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,13 @@ SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a;
9 | 100
(4 rows)

SET duckdb.max_threads_per_postgres_scan to 4;
SELECT COUNT(*) FROM t;
count
-------
1000
(1 row)

SELECT a, COUNT(*) FROM t WHERE a > 5 GROUP BY a ORDER BY a;
a | count
---+-------
6 | 100
7 | 100
8 | 100
9 | 100
(4 rows)

CREATE TABLE empty(a INT);
SELECT COUNT(*) FROM empty;
count
-------
0
(1 row)

SET duckdb.max_threads_per_postgres_scan TO default;
SET client_min_messages TO default;
DROP TABLE t;
DROP TABLE empty;
Expand Down
164 changes: 164 additions & 0 deletions test/regression/expected/scan_postgres_tables.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
CREATE TABLE t1(a INT, b INT, c TEXT);
INSERT INTO t1 SELECT g, g % 100, 'scan_potgres_table_'||g from generate_series(1,100000) g;
SET client_min_messages TO DEBUG1;
-- COUNT(*)
SELECT COUNT(*) FROM t1;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT COUNT(*) FROM public.t1
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Aggregate
-> Parallel Seq Scan on t1

count
--------
100000
(1 row)

-- SEQ SCAN
SELECT COUNT(a) FROM t1 WHERE a < 10;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a < 10))

count
-------
9
(1 row)

-- CREATE INDEX on t1
SET client_min_messages TO DEFAULT;
CREATE INDEX ON t1(a);
SET client_min_messages TO DEBUG1;
-- BITMAP INDEX
SELECT COUNT(a) FROM t1 WHERE a < 10;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<10 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Bitmap Heap Scan on t1
Recheck Cond: ((a < 10) AND (a IS NOT NULL))
-> Bitmap Index Scan on t1_a_idx
Index Cond: ((a < 10) AND (a IS NOT NULL))

count
-------
9
(1 row)

-- INDEXONLYSCAN
SET enable_bitmapscan TO false;
SELECT COUNT(a) FROM t1 WHERE a = 1;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a=1 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Only Scan using t1_a_idx on t1
Index Cond: ((a IS NOT NULL) AND (a = 1))

count
-------
1
(1 row)

-- INDEXSCAN
SELECT COUNT(c) FROM t1 WHERE a = 1;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT c FROM public.t1 WHERE (a=1 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Scan using t1_a_idx on t1
Index Cond: ((a IS NOT NULL) AND (a = 1))

count
-------
1
(1 row)

-- TEMPORARY TABLES JOIN WITH HEAP TABLES
SET client_min_messages TO DEFAULT;
CREATE TEMP TABLE t2(a int);
INSERT INTO t2 VALUES (1), (2), (3);
SET client_min_messages TO DEBUG1;
SELECT t1.a, t2.a FROM t1, t2 WHERE t1.a = t2.a;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM pg_temp.t2
RUNNING: IN PROCESS THREAD.
EXECUTING:
Seq Scan on t2

DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a>=1 AND a<=3 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Index Only Scan using t1_a_idx on t1
Index Cond: ((a >= 1) AND (a <= 3) AND (a IS NOT NULL))

a | a
---+---
1 | 1
2 | 2
3 | 3
(3 rows)

-- JOIN WITH SAME TABLE (on WORKERS)
SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a < 2))

DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL)
RUNNING: ON 1 PARALLEL WORKER(S).
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a > 8))

count
-------
99992
(1 row)

-- JOIN WITH SAME TABLE (in BACKEND PROCESS)
SET max_parallel_workers TO 0;
SELECT COUNT(*) FROM t1 AS t1_1, t1 AS t1_2 WHERE t1_1.a < 2 AND t1_2.a > 8;
DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a<2 AND a IS NOT NULL)
RUNNING: IN PROCESS THREAD.
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a < 2))

DEBUG: (PGDuckdDB/PostgresTableReader)

QUERY: SELECT a FROM public.t1 WHERE (a>8 AND a IS NOT NULL)
RUNNING: IN PROCESS THREAD.
EXECUTING:
Parallel Seq Scan on t1
Filter: ((a IS NOT NULL) AND (a > 8))

count
-------
99992
(1 row)

SET max_parallel_workers TO DEFAULT;
SET enable_bitmapscan TO DEFAULT;
SET client_min_messages TO DEFAULT;
DROP TABLE t1, t2;
Loading

0 comments on commit de068c6

Please sign in to comment.