Skip to content

Commit

Permalink
PR review changes. Cleanup scan, if possible, once the scan is
Browse files Browse the repository at this point in the history
exhausted.
  • Loading branch information
mkaruza committed Dec 17, 2024
1 parent 0ea0b70 commit aa6832c
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 37 deletions.
4 changes: 2 additions & 2 deletions include/pgduckdb/pgduckdb_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ __PostgresFunctionGuard__(const char *func_name, FuncArgs... args) {
throw duckdb::Exception(duckdb::ExceptionType::EXECUTOR, message);
}

#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, __VA_ARGS__)
#define PostgresFunctionGuard(FUNC, ...) \
pgduckdb::__PostgresFunctionGuard__<decltype(&FUNC), &FUNC>(__func__, ##__VA_ARGS__)

duckdb::unique_ptr<duckdb::QueryResult> DuckDBQueryOrThrow(duckdb::ClientContext &context, const std::string &query);

Expand Down
2 changes: 1 addition & 1 deletion include/pgduckdb/scan/postgres_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct PostgresScanGlobalState : public duckdb::GlobalTableFunctionState {
Relation rel;
TupleDesc table_tuple_desc;
bool count_tuples_only;
duckdb::vector<duckdb::pair<duckdb::idx_t, AttrNumber>> output_columns;
duckdb::vector<AttrNumber> output_columns;
std::atomic<std::uint32_t> total_row_count;
std::ostringstream scan_query;
duckdb::shared_ptr<PostgresTableReader> table_reader_global_state;
Expand Down
4 changes: 2 additions & 2 deletions include/pgduckdb/scan/postgres_table_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ class PostgresTableReader {
PostgresTableReader(const char *table_scan_query, bool count_tuples_only);
~PostgresTableReader();
TupleTableSlot *GetNextTuple();

void PostgresTableReaderCleanup();
private:
MinimalTuple GetNextWorkerTuple();
int ParallelWorkerNumber(Cardinality cardinality);
std::string ExplainScanPlan(QueryDesc *query_desc);
const char * ExplainScanPlan(QueryDesc *query_desc);
bool MarkPlanParallelAware(Plan *plan);
private:
QueryDesc *table_scan_query_desc;
Expand Down
1 change: 0 additions & 1 deletion include/pgduckdb/utility/rename_ruleutils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#define pg_get_statisticsobjdef_string pgduckdb_pg_get_statisticsobjdef_string
#define get_list_partvalue_string pgduckdb_get_list_partvalue_string


/*
* The following replaces all usages of generate_qualified_relation_name and
* generate_relation_name with calls to the pgduckdb_relation_name function
Expand Down
2 changes: 1 addition & 1 deletion src/pgduckdb_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1312,7 +1312,7 @@ InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresScanLocalState &scan_loc
}
/* Write tuple columns in output vector. */
int duckdb_output_index = 0;
for (auto const &[_, attr_num] : scan_global_state->output_columns) {
for (auto const &attr_num : scan_global_state->output_columns) {
auto &result = output.data[duckdb_output_index];
if (slot->tts_isnull[duckdb_output_index]) {
auto &array_mask = duckdb::FlatVector::Validity(result);
Expand Down
39 changes: 19 additions & 20 deletions src/scan/postgres_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,18 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput
*/
if (input.CanRemoveFilterColumns()) {
for (const auto &projection_id : input.projection_ids) {
output_columns.emplace_back(projection_id, input.column_ids[projection_id] + 1);
output_columns.emplace_back(input.column_ids[projection_id] + 1);
}
} else {
duckdb::idx_t output_index = 0;
for (const auto &column_id : input.column_ids) {
output_columns.emplace_back(output_index++, column_id + 1);
output_columns.emplace_back(column_id + 1);
}
}

scan_query << "SELECT ";

bool first = true;
for (auto const &[duckdb_scanned_index, attr_num] : output_columns) {
for (auto const &attr_num : output_columns) {
if (!first) {
scan_query << ", ";
}
Expand All @@ -95,18 +94,22 @@ PostgresScanGlobalState::ConstructTableScanQuery(duckdb::TableFunctionInitInput

for (auto const &[attr_num, duckdb_scanned_index] : columns_to_scan) {
auto filter = column_filters[duckdb_scanned_index];
if (filter) {
if (first) {
scan_query << " WHERE ";
} else {
scan_query << " AND ";
}
first = false;
scan_query << "(";
auto attr = table_tuple_desc->attrs[attr_num - 1];
scan_query << filter->ToString(attr.attname.data).c_str();
scan_query << ") ";

if (!filter) {
continue;
}

if (first) {
scan_query << " WHERE ";
} else {
scan_query << " AND ";
}

first = false;
scan_query << "(";
auto attr = table_tuple_desc->attrs[attr_num - 1];
scan_query << filter->ToString(attr.attname.data).c_str();
scan_query << ") ";
}
}

Expand Down Expand Up @@ -204,18 +207,14 @@ PostgresScanTableFunction::PostgresScanFunction(duckdb::ClientContext &, duckdb:
for (; i < STANDARD_VECTOR_SIZE; i++) {
TupleTableSlot *slot = local_state.global_state->table_reader_global_state->GetNextTuple();
if (TupIsNull(slot)) {
local_state.global_state->table_reader_global_state->PostgresTableReaderCleanup();
local_state.exhausted_scan = true;
break;
}
slot_getallattrs(slot);
InsertTupleIntoChunk(output, local_state, slot);
}

/* If we finish before reading complete vector means that scan was exhausted. */
if (i != STANDARD_VECTOR_SIZE) {
local_state.exhausted_scan = true;
}

SetOutputCardinality(output, local_state);
}

Expand Down
33 changes: 23 additions & 10 deletions src/scan/postgres_table_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,19 @@ PostgresTableReader::PostgresTableReader(const char *table_scan_query, bool coun

elog(DEBUG1, "(PGDuckdDB/PostgresTableReader)\n\nQUERY: %s\nRUNNING: %s.\nEXECUTING: \n%s", table_scan_query,
!nreaders ? "IN PROCESS THREAD" : psprintf("ON %d PARALLEL WORKER(S)", nreaders),
ExplainScanPlan(table_scan_query_desc).c_str());
ExplainScanPlan(table_scan_query_desc));

slot = PostgresFunctionGuard(ExecInitExtraTupleSlot, table_scan_query_desc->estate,
table_scan_planstate->ps_ResultTupleDesc, &TTSOpsMinimalTuple);
}

PostgresTableReader::~PostgresTableReader() {
if (table_scan_query_desc) {
PostgresTableReaderCleanup();
}
}

void PostgresTableReader::PostgresTableReaderCleanup() {
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());

PostgresScopedStackReset scoped_stack_reset;
Expand All @@ -127,13 +133,13 @@ PostgresTableReader::~PostgresTableReader() {
PostgresFunctionGuard(ExecParallelCleanup, parallel_executor_info);
}

parallel_executor_info = NULL;
parallel_executor_info = nullptr;

if (parallel_worker_readers) {
PostgresFunctionGuard(pfree, parallel_worker_readers);
}

parallel_worker_readers = NULL;
parallel_worker_readers = nullptr;

PostgresFunctionGuard(ExecutorFinish, table_scan_query_desc);
PostgresFunctionGuard(ExecutorEnd, table_scan_query_desc);
Expand All @@ -142,6 +148,8 @@ PostgresTableReader::~PostgresTableReader() {
if (entered_parallel_mode) {
ExitParallelMode();
}

table_scan_query_desc = nullptr;
}

int
Expand All @@ -152,16 +160,21 @@ PostgresTableReader::ParallelWorkerNumber(Cardinality cardinality) {
return std::max(1, std::min(base, std::max(max_workers_per_postgres_scan, max_parallel_workers)));
}

std::string
PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) {
ExplainState *es = (ExplainState *)PostgresFunctionGuard(palloc0, sizeof(ExplainState));
const char *
ExplainScanPlan_Unsafe(QueryDesc *query_desc) {
ExplainState *es = (ExplainState *)palloc0(sizeof(ExplainState));
es->str = makeStringInfo();
es->format = EXPLAIN_FORMAT_TEXT;
PostgresFunctionGuard(ExplainPrintPlan, es, query_desc);
std::string explain_scan(es->str->data);
return explain_scan;
ExplainPrintPlan(es, query_desc);
return es->str->data;
}

const char *
PostgresTableReader::ExplainScanPlan(QueryDesc *query_desc) {
return PostgresFunctionGuard(ExplainScanPlan_Unsafe, query_desc);
}


bool
PostgresTableReader::MarkPlanParallelAware(Plan *plan) {
switch (nodeTag(plan)) {
Expand Down Expand Up @@ -208,7 +221,7 @@ PostgresTableReader::GetNextTuple() {
std::lock_guard<std::mutex> lock(GlobalProcessLock::GetLock());
PostgresScopedStackReset scoped_stack_reset;
table_scan_query_desc->estate->es_query_dsa = parallel_executor_info ? parallel_executor_info->area : NULL;
thread_scan_slot = ExecProcNode(table_scan_planstate);
thread_scan_slot = PostgresFunctionGuard(ExecProcNode, table_scan_planstate);
table_scan_query_desc->estate->es_query_dsa = NULL;
if (!TupIsNull(thread_scan_slot)) {
return thread_scan_slot;
Expand Down

0 comments on commit aa6832c

Please sign in to comment.