Skip to content

Commit

Permalink
Merge pull request #15 from hydradatabase/projections-selection
Browse files Browse the repository at this point in the history
Heap scan column selection, filtering and projection
  • Loading branch information
mkaruza authored May 2, 2024
2 parents d4ffc83 + a6cb22d commit 3cdef3a
Show file tree
Hide file tree
Showing 14 changed files with 480 additions and 63 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ MODULE_big = quack
EXTENSION = quack
DATA = quack.control $(wildcard quack--*.sql)

SRCS = src/quack_heap_seq_scan.cpp \
SRCS = src/quack_detoast.cpp \
src/quack_filter.cpp \
src/quack_heap_scan.cpp \
src/quack_heap_seq_scan.cpp \
src/quack_hooks.cpp \
src/quack_select.cpp \
src/quack_types.cpp \
Expand Down Expand Up @@ -34,7 +36,7 @@ else
QUACK_BUILD_DUCKDB = release
endif

override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 ${QUACK_BUILD_CXX_FLAGS}
override PG_CPPFLAGS += -Iinclude -Ithird_party/duckdb/src/include -std=c++17 -Wno-sign-compare ${QUACK_BUILD_CXX_FLAGS}

SHLIB_LINK += -Wl,-rpath,$(PG_LIB)/ -lpq -L$(PG_LIB) -lduckdb -Lthird_party/duckdb/build/$(QUACK_BUILD_DUCKDB)/src -lstdc++

Expand Down
15 changes: 15 additions & 0 deletions include/quack/quack_detoast.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
}

#include <mutex>

namespace quack {

Datum DetoastPostgresDatum(struct varlena *value, std::mutex &lock, bool *shouldFree);

} // namespace quack
11 changes: 11 additions & 0 deletions include/quack/quack_filter.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#pragma once

#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
}

namespace quack {
bool ApplyValueFilter(duckdb::TableFilter &filter, Datum &value, bool isNull, Oid typeOid);
} // namespace quack
7 changes: 3 additions & 4 deletions include/quack/quack_heap_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ extern "C" {

// Postgres Relation


namespace quack {

struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
Expand All @@ -23,15 +22,15 @@ struct PostgresHeapScanLocalState : public duckdb::LocalTableFunctionState {
~PostgresHeapScanLocalState() override;

public:
PostgresHeapSeqScan & m_rel;
PostgresHeapSeqScan &m_rel;
PostgresHeapSeqScanThreadInfo m_thread_seq_scan_info;
bool m_exhausted_scan = false;
};

// Global State

struct PostgresHeapScanGlobalState : public duckdb::GlobalTableFunctionState {
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation);
explicit PostgresHeapScanGlobalState(PostgresHeapSeqScan &relation, duckdb::TableFunctionInitInput &input);
~PostgresHeapScanGlobalState();
idx_t
MaxThreads() const override {
Expand Down Expand Up @@ -67,7 +66,7 @@ struct PostgresHeapScanFunction : public duckdb::TableFunction {
// LocalTableFunctionState *lstate, GlobalTableFunctionState *gstate); static double PostgresProgress(ClientContext
// &context, const FunctionData *bind_data_p, const GlobalTableFunctionState *gstate);
static void PostgresHeapScanFunc(duckdb::ClientContext &context, duckdb::TableFunctionInput &data_p,
duckdb::DataChunk &output);
duckdb::DataChunk &output);
// static unique_ptr<NodeStatistics> PostgresCardinality(ClientContext &context, const FunctionData *bind_data);
// static idx_t PostgresGetBatchIndex(ClientContext &context, const FunctionData *bind_data_p,
// LocalTableFunctionState *local_state, GlobalTableFunctionState *global_state); static void
Expand Down
43 changes: 31 additions & 12 deletions include/quack/quack_heap_seq_scan.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ extern "C" {
#include "postgres.h"
#include "access/tableam.h"
#include "access/heapam.h"
#include "storage/bufmgr.h"
}

#include <mutex>
#include <atomic>

namespace quack {

Expand All @@ -31,18 +33,35 @@ class PostgresHeapSeqScanThreadInfo {
HeapTupleData m_tuple;
};

class PostgresHeapSeqScan {
class PostgresHeapSeqParallelScanState {
private:
class ParallelScanState {
public:
ParallelScanState() : m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber) {
}
BlockNumber AssignNextBlockNumber();
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
};
static int const k_max_prefetch_block_number = 32;

public:
PostgresHeapSeqParallelScanState()
: m_nblocks(InvalidBlockNumber), m_last_assigned_block_number(InvalidBlockNumber), m_count_tuples_only(false),
m_total_row_count(0), m_last_prefetch_block(0), m_strategy(nullptr) {
}
~PostgresHeapSeqParallelScanState() {
if (m_strategy)
pfree(m_strategy);
}
BlockNumber AssignNextBlockNumber();
void PrefetchNextRelationPages(Relation rel);
std::mutex m_lock;
BlockNumber m_nblocks;
BlockNumber m_last_assigned_block_number;
bool m_count_tuples_only;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_columns;
duckdb::map<duckdb::idx_t, duckdb::idx_t> m_projections;
duckdb::TableFilterSet *m_filters = nullptr;
std::atomic<std::uint32_t> m_total_row_count;
BlockNumber m_last_prefetch_block;
BufferAccessStrategy m_strategy;
};

class PostgresHeapSeqScan {
private:
public:
PostgresHeapSeqScan(RangeTblEntry *table);
~PostgresHeapSeqScan();
Expand All @@ -52,7 +71,7 @@ class PostgresHeapSeqScan {
PostgresHeapSeqScan(PostgresHeapSeqScan &&other);

public:
void InitParallelScanState();
void InitParallelScanState( duckdb::TableFunctionInitInput &input);
void
SetSnapshot(Snapshot snapshot) {
m_snapshot = snapshot;
Expand All @@ -70,7 +89,7 @@ class PostgresHeapSeqScan {
private:
Relation m_rel = nullptr;
Snapshot m_snapshot = nullptr;
ParallelScanState m_parallel_scan_state;
PostgresHeapSeqParallelScanState m_parallel_scan_state;
};

} // namespace quack
1 change: 0 additions & 1 deletion include/quack/quack_select.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

extern "C" {
#include "postgres.h"

#include "executor/executor.h"
}

Expand Down
5 changes: 4 additions & 1 deletion include/quack/quack_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ extern "C" {
#include "executor/tuptable.h"
}

#include "quack/quack_heap_seq_scan.hpp"

namespace quack {
duckdb::LogicalType ConvertPostgresToDuckColumnType(Oid type);
void ConvertPostgresToDuckValue(Datum value, duckdb::Vector &result, idx_t offset);
void ConvertDuckToPostgresValue(TupleTableSlot *slot, duckdb::Value &value, idx_t col);
void InsertTupleIntoChunk(duckdb::DataChunk &output, TupleDesc tuple, HeapTupleData *slot, idx_t offset);
void InsertTupleIntoChunk(duckdb::DataChunk &output, PostgresHeapSeqScanThreadInfo &threadScanInfo,
PostgresHeapSeqParallelScanState &parallelScanState);
} // namespace quack
165 changes: 165 additions & 0 deletions src/quack_detoast.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#include "duckdb.hpp"

extern "C" {
#include "postgres.h"
#include "pg_config.h"
#include "varatt.h"

#ifdef USE_LZ4
#include <lz4.h>
#endif

#include "access/detoast.h"
#include "access/table.h"
#include "access/tableam.h"
#include "access/toast_internals.h"
#include "common/pg_lzcompress.h"
#include "utils/expandeddatum.h"
}

#include "quack/quack_types.hpp"
#include "quack/quack_detoast.hpp"

/*
* Following functions are direct logic found in postgres code but for duckdb execution they are needed to be thread
* safe. Functions as palloc/pfree are exchanged with duckdb_malloc/duckdb_free. Access to toast table is protected with
* lock also for thread safe reasons. This is initial implementation but should be revisisted in future for better
* performances.
*/

namespace quack {

struct varlena *
_pglz_decompress_datum(const struct varlena *value) {
struct varlena *result;
int32 rawsize;

result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);

rawsize = pglz_decompress((char *)value + VARHDRSZ_COMPRESSED, VARSIZE(value) - VARHDRSZ_COMPRESSED,
VARDATA(result), VARDATA_COMPRESSED_GET_EXTSIZE(value), true);
if (rawsize < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed pglz data is corrupt")));

SET_VARSIZE(result, rawsize + VARHDRSZ);

return result;
}

struct varlena *
_lz4_decompress_datum(const struct varlena *value) {
#ifndef USE_LZ4
return NULL; /* keep compiler quiet */
#else
int32 rawsize;
struct varlena *result;

result = (struct varlena *)duckdb_malloc(VARDATA_COMPRESSED_GET_EXTSIZE(value) + VARHDRSZ);

rawsize = LZ4_decompress_safe((char *)value + VARHDRSZ_COMPRESSED, VARDATA(result),
VARSIZE(value) - VARHDRSZ_COMPRESSED, VARDATA_COMPRESSED_GET_EXTSIZE(value));
if (rawsize < 0)
ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg_internal("compressed lz4 data is corrupt")));

SET_VARSIZE(result, rawsize + VARHDRSZ);

return result;
#endif
}

static struct varlena *
_toast_decompress_datum(struct varlena *attr) {
ToastCompressionId cmid;
cmid = (ToastCompressionId)TOAST_COMPRESS_METHOD(attr);
switch (cmid) {
case TOAST_PGLZ_COMPRESSION_ID:
return _pglz_decompress_datum(attr);
case TOAST_LZ4_COMPRESSION_ID:
return _lz4_decompress_datum(attr);
default:
elog(ERROR, "invalid compression method id %d", TOAST_COMPRESS_METHOD(attr));
return NULL; /* keep compiler quiet */
}
}

static struct varlena *
_toast_fetch_datum(struct varlena *attr, std::mutex &lock) {
Relation toastrel;
struct varlena *result;
struct varatt_external toast_pointer;
int32 attrsize;

if (!VARATT_IS_EXTERNAL_ONDISK(attr))
elog(ERROR, "toast_fetch_datum shouldn't be called for non-ondisk datums");

/* Must copy to access aligned fields */
VARATT_EXTERNAL_GET_POINTER(toast_pointer, attr);

attrsize = VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer);

result = (struct varlena *)duckdb_malloc(attrsize + VARHDRSZ);

if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) {
SET_VARSIZE_COMPRESSED(result, attrsize + VARHDRSZ);
} else {
SET_VARSIZE(result, attrsize + VARHDRSZ);
}

if (attrsize == 0)
return result;

lock.lock();
toastrel = table_open(toast_pointer.va_toastrelid, AccessShareLock);
table_relation_fetch_toast_slice(toastrel, toast_pointer.va_valueid, attrsize, 0, attrsize, result);
table_close(toastrel, AccessShareLock);
lock.unlock();

return result;
}

Datum
DetoastPostgresDatum(struct varlena *attr, std::mutex &lock, bool *shouldFree) {
struct varlena *toastedValue = nullptr;
*shouldFree = true;
if (VARATT_IS_EXTERNAL_ONDISK(attr)) {
toastedValue = _toast_fetch_datum(attr, lock);
if (VARATT_IS_COMPRESSED(toastedValue)) {
struct varlena *tmp = toastedValue;
toastedValue = _toast_decompress_datum(tmp);
duckdb_free(tmp);
}
} else if (VARATT_IS_EXTERNAL_INDIRECT(attr)) {
struct varatt_indirect redirect;
VARATT_EXTERNAL_GET_POINTER(redirect, attr);
toastedValue = (struct varlena *)redirect.pointer;
toastedValue = reinterpret_cast<struct varlena *>(DetoastPostgresDatum(attr, lock, shouldFree));
if (attr == (struct varlena *)redirect.pointer) {
struct varlena *result;
result = (struct varlena *)(VARSIZE_ANY(attr));
memcpy(result, attr, VARSIZE_ANY(attr));
toastedValue = result;
}
} else if (VARATT_IS_EXTERNAL_EXPANDED(attr)) {
ExpandedObjectHeader *eoh;
Size resultsize;
eoh = DatumGetEOHP(PointerGetDatum(attr));
resultsize = EOH_get_flat_size(eoh);
toastedValue = (struct varlena *)duckdb_malloc(resultsize);
EOH_flatten_into(eoh, (void *)toastedValue, resultsize);
} else if (VARATT_IS_COMPRESSED(attr)) {
toastedValue = _toast_decompress_datum(attr);
} else if (VARATT_IS_SHORT(attr)) {
Size data_size = VARSIZE_SHORT(attr) - VARHDRSZ_SHORT;
Size new_size = data_size + VARHDRSZ;
toastedValue = (struct varlena *)duckdb_malloc(new_size);
SET_VARSIZE(toastedValue, new_size);
memcpy(VARDATA(toastedValue), VARDATA_SHORT(attr), data_size);
} else {
toastedValue = attr;
*shouldFree = false;
}

return reinterpret_cast<Datum>(toastedValue);
}

} // namespace quack
Loading

0 comments on commit 3cdef3a

Please sign in to comment.