Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 284 additions & 29 deletions cpp/deeplake_pg/duckdb_deeplake_scan.cpp

Large diffs are not rendered by default.

80 changes: 80 additions & 0 deletions cpp/deeplake_pg/duckdb_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#include "table_storage.hpp"
#include "utils.hpp"

#include <base/system_report.hpp>

#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -60,6 +62,41 @@ struct duckdb_connections {
}
};

/**
* @brief Creates DuckDB database and connection with memory management configuration
*
* DuckDB Memory Management:
* -------------------------
* This function configures DuckDB's memory_limit and temp_directory settings to prevent
* out-of-memory crashes during large operations such as hash joins (e.g., TPC-H Q13 at SF100+).
*
* Memory Limit Configuration:
* - Controlled by pg_deeplake.duckdb_memory_limit_mb GUC parameter (default: 0 = auto-detect)
* - Auto-detection uses 80% of system memory with 256MB minimum floor
* - When memory limit is exceeded, DuckDB spills intermediate results to disk
* - Memory values use MB units consistently throughout the codebase
*
* Disk Spilling:
* - Controlled by pg_deeplake.duckdb_temp_directory GUC parameter (default: empty = DuckDB default)
* - temp_directory enables disk spilling when memory_limit is exceeded
* - Requires sufficient disk space for large operations (recommend SSD with adequate capacity)
* - Temporary files are automatically cleaned up after query completion
*
* Container Considerations:
* - IMPORTANT: For containerized environments with cgroup limits, base::system_report::total_memory()
* may return host memory instead of container limits
* - Set pg_deeplake.duckdb_memory_limit_mb explicitly in such environments to avoid OOM
* - Example: SET pg_deeplake.duckdb_memory_limit_mb = 4096; -- 4GB limit for container
*
* Tuning Guidelines:
* - For dedicated database servers: auto-detection (80% system memory) is safe
* - For shared/containerized environments: set explicit limit based on available memory
* - For OLTP workloads: smaller limit with faster spilling (e.g., 25% of memory)
* - For OLAP workloads: larger limit for better performance (e.g., 80% of memory)
*
* @return Unique pointer to duckdb_connections structure containing both C++ and C API connections
* @throws std::exception if DuckDB connection creation or configuration fails
*/
std::unique_ptr<duckdb_connections> create_connections()
{
try {
Expand All @@ -71,6 +108,49 @@ std::unique_ptr<duckdb_connections> create_connections()
conns->db_cpp = std::make_unique<duckdb::DuckDB>(":memory:", &config);
conns->con_cpp = std::make_unique<duckdb::Connection>(*(conns->db_cpp));

// Configure DuckDB memory limit and temp directory for disk spilling
// This prevents OOM crashes during large operations like JOINs (e.g., TPC-H Q13 at SF100+)
uint64_t mem_limit_mb = 0;
if (pg::duckdb_memory_limit_mb > 0) {
// Use explicit configuration
mem_limit_mb = static_cast<uint64_t>(pg::duckdb_memory_limit_mb);
} else {
// Auto-detect: use 80% of system memory with 256MB minimum
// Note: For containerized environments with cgroup limits, base::system_report::total_memory()
// may return host memory. Set pg_deeplake.duckdb_memory_limit_mb explicitly in containers.
mem_limit_mb = static_cast<uint64_t>(base::system_report::total_memory() * 0.8 / (1024ULL * 1024ULL));
if (mem_limit_mb < 256) {
mem_limit_mb = 256;
}
}

// Set memory_limit (DuckDB uses MB units)
auto mem_result = conns->con_cpp->Query(fmt::format("SET memory_limit='{}MB'", mem_limit_mb));
if (!mem_result || mem_result->HasError()) {
elog(WARNING, "Failed to set DuckDB memory_limit: %s",
mem_result ? mem_result->GetError().c_str() : "null result");
}

// Set temp_directory if configured (enables disk spilling)
if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) {
auto temp_result = conns->con_cpp->Query(fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory));
if (!temp_result || temp_result->HasError()) {
elog(WARNING, "Failed to set DuckDB temp_directory: %s",
temp_result ? temp_result->GetError().c_str() : "null result");
}
}

// Verify and log the applied settings at INFO level for operational visibility
auto verify_mem = conns->con_cpp->Query("SELECT current_setting('memory_limit')");
if (verify_mem && !verify_mem->HasError() && verify_mem->RowCount() > 0) {
elog(INFO, "DuckDB memory_limit: %s", verify_mem->GetValue(0, 0).ToString().c_str());
}

auto verify_temp = conns->con_cpp->Query("SELECT current_setting('temp_directory')");
if (verify_temp && !verify_temp->HasError() && verify_temp->RowCount() > 0) {
elog(INFO, "DuckDB temp_directory: %s", verify_temp->GetValue(0, 0).ToString().c_str());
}

// Register the deeplake_scan table function for zero-copy access
pg::register_deeplake_scan_function(*(conns->con_cpp));

Expand Down
33 changes: 33 additions & 0 deletions cpp/deeplake_pg/extension_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ bool use_shared_mem_for_refresh = false;
bool enable_dataset_logging = false; // Enable dataset operation logging for debugging
bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options
bool stateless_enabled = false; // Enable stateless catalog sync across instances
int32_t duckdb_memory_limit_mb = 0; // DuckDB memory limit in MB (0 = auto-detect)
char* duckdb_temp_directory = nullptr; // DuckDB temp directory for disk spilling (empty = default)

} // namespace pg

Expand Down Expand Up @@ -277,6 +279,37 @@ void initialize_guc_parameters()
);


// DuckDB memory configuration
DefineCustomIntVariable("pg_deeplake.duckdb_memory_limit_mb",
"Memory limit for DuckDB operations in MB (0 = auto-detect).",
"This parameter controls DuckDB's internal memory budget for large operations like JOINs. "
"When set to 0, the limit is auto-detected as 80% of system memory. "
"DuckDB will spill to disk when this limit is exceeded, preventing out-of-memory crashes.",
&pg::duckdb_memory_limit_mb, // linked C variable
0, // default value (auto-detect)
0, // min value (0 = auto)
INT_MAX, // max value
PGC_USERSET, // context (USERSET, SUSET, etc.)
GUC_UNIT_MB, // flags - treat as MB
nullptr,
nullptr,
nullptr // check_hook, assign_hook, show_hook
);

DefineCustomStringVariable("pg_deeplake.duckdb_temp_directory",
"Temporary directory for DuckDB disk spilling during large operations.",
"Specifies where DuckDB writes temporary files when memory_limit is exceeded. "
"Empty string (default) uses DuckDB's default temp location. "
"DuckDB will validate the path at runtime and fail gracefully if invalid.",
&pg::duckdb_temp_directory, // linked C variable
"", // default value (empty = DuckDB default)
PGC_USERSET, // context - can be set by any user
0, // flags
nullptr, // check_hook
nullptr, // assign_hook
nullptr // show_hook
);

// Initialize PostgreSQL memory tracking
pg::memory_tracker::initialize_guc_parameters();

Expand Down
7 changes: 7 additions & 0 deletions cpp/deeplake_pg/table_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ struct table_data

template <typename T>
inline const T* value_ptr(int32_t column_number, int64_t row_number);

// Get string holder for a batch containing the given row (for zero-copy string access)
inline std::pair<impl::string_stream_array_holder*, int64_t> get_string_batch(
int32_t column_number, int64_t row_number);

// Get batch owner array for a given row (for keeping buffer alive in DuckDB vector)
inline nd::array* get_batch_owner(int32_t column_number, int64_t row_number);
};

inline streamer_info& get_streamers() noexcept;
Expand Down
55 changes: 55 additions & 0 deletions cpp/deeplake_pg/table_data_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,19 @@ inline void table_data::open_dataset(bool create)
ASSERT(dataset_ != nullptr);
num_total_rows_ = dataset_->num_rows();

// Validate row count doesn't exceed BlockNumber limits for TID conversion
// With TUPLES_PER_BLOCK=256 and BlockNumber=uint32_t, max is ~1.1 trillion rows
constexpr int64_t MAX_SUPPORTED_ROWS =
static_cast<int64_t>(UINT32_MAX) * static_cast<int64_t>(pg::DEEPLAKE_TUPLES_PER_BLOCK);
if (num_total_rows_ > MAX_SUPPORTED_ROWS) {
elog(WARNING,
"Table '%s' has %ld rows, exceeding max supported %ld for TID conversion. "
"Consider partitioning or sharding.",
table_name_.c_str(),
num_total_rows_,
MAX_SUPPORTED_ROWS);
}

// Enable logging if GUC parameter is set
if (pg::enable_dataset_logging && dataset_ && !dataset_->is_logging_enabled()) {
dataset_->start_logging();
Expand Down Expand Up @@ -631,6 +644,48 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number,
return batch.holder_.data(static_cast<size_t>(row_in_batch));
}

inline std::pair<impl::string_stream_array_holder*, int64_t>
table_data::streamer_info::get_string_batch(int32_t column_number, int64_t row_number)
{
const int64_t batch_index = row_number >> batch_size_log2_;
const int64_t row_in_batch = row_number & batch_mask_;

auto& col_data = column_to_batches[column_number];
auto& batch = col_data.batches[batch_index];
if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] {
std::lock_guard lock(col_data.mutex_);
for (int64_t i = 0; i <= batch_index; ++i) {
if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition risk: The double-checked locking pattern initializes multiple batches (loop from 0 to batch_index) while only checking batch.initialized_ for the target batch. If two threads request different batches concurrently, both may enter the lock and initialize overlapping ranges, potentially causing data races on batch.owner_ and batch.holder_.

Consider checking and setting initialized flag for each batch i inside the loop to prevent concurrent initialization of the same batch.

col_data.batches[i].owner_ = streamers[column_number]->next_batch();
col_data.batches[i].holder_ = impl::string_stream_array_holder(col_data.batches[i].owner_);
col_data.batches[i].initialized_.store(true, std::memory_order_release);
}
}
}

return {&batch.holder_, row_in_batch};
}

inline nd::array* table_data::streamer_info::get_batch_owner(int32_t column_number, int64_t row_number)
{
const int64_t batch_index = row_number >> batch_size_log2_;

auto& col_data = column_to_batches[column_number];
auto& batch = col_data.batches[batch_index];
if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] {
std::lock_guard lock(col_data.mutex_);
for (int64_t i = 0; i <= batch_index; ++i) {
if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) {
col_data.batches[i].owner_ = streamers[column_number]->next_batch();
col_data.batches[i].holder_ = impl::string_stream_array_holder(col_data.batches[i].owner_);
col_data.batches[i].initialized_.store(true, std::memory_order_release);
}
}
}

return &batch.owner_;
}

inline bool table_data::flush()
{
const bool s1 = flush_inserts(true);
Expand Down
2 changes: 2 additions & 0 deletions cpp/deeplake_pg/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ extern bool use_shared_mem_for_refresh;
extern bool enable_dataset_logging;
extern bool allow_custom_paths;
extern bool stateless_enabled;
extern int32_t duckdb_memory_limit_mb;
extern char* duckdb_temp_directory;

namespace utils {

Expand Down
115 changes: 112 additions & 3 deletions cpp/nd/string_array_holder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "array.hpp"

#include <algorithm>
#include <string_view>

namespace nd {
Expand All @@ -23,20 +24,128 @@ class string_stream_array_holder
return is_valid_;
}

// OPTIMIZATION: Check if all data comes from a single contiguous chunk.
// When true, batch_data_fast() can be used for O(1) sequential access without binary search.
inline bool is_single_chunk() const noexcept
{
return offsets_.size() == 1;
}

// OPTIMIZATION: Fast batch data access for single-chunk case.
// Avoids binary search overhead in get_range_data() by directly computing string bounds.
// REQUIRES: is_single_chunk() == true, index must be valid
inline std::string_view batch_data_fast(int64_t index) const noexcept
{
// Single chunk: offsets_[0] = 0, buffer_cache_[0], offsets_cache_[0] are the only entries
const auto local_idx = range_offsets_[0] + index;
const auto* offsets = offsets_cache_[0];
const auto start_offset = offsets[0];
const auto str_start = offsets[local_idx] - start_offset;
const auto str_end = offsets[local_idx + 1] - start_offset;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential buffer overrun: Accessing offsets[local_idx + 1] without bounds checking. The function requires is_single_chunk() == true but doesn't validate that local_idx + 1 is within the valid offset array range.

Add bounds validation before access to prevent undefined behavior.

const auto* buffer = buffer_cache_[0];
return std::string_view(reinterpret_cast<const char*>(buffer + str_start), str_end - str_start);
}

// OPTIMIZATION: Bulk access for contiguous string data extraction.
// Returns the raw buffer pointer and string offsets for a batch, enabling vectorized operations.
// This allows callers to set up all string_t entries in a single pass without per-string calls.
// REQUIRES: is_single_chunk() == true
struct contiguous_string_data
{
const uint8_t* buffer; // Pointer to raw string data
const uint32_t* offsets; // String offset array (offsets[i] to offsets[i+1] is string i)
uint32_t base_offset; // Offset to subtract from all offset values
int64_t start_index; // Starting index within the offset array
};

inline contiguous_string_data get_contiguous_strings(int64_t batch_start) const noexcept
{
// For single chunk, return direct access to buffer and offset arrays
const auto local_idx = range_offsets_[0] + batch_start;
const auto* offsets = offsets_cache_[0];
const auto base_offset = offsets[0];
return {buffer_cache_[0], offsets, base_offset, local_idx};
}

// Calculate total bytes for a batch of strings (useful for pre-allocation)
inline uint64_t get_batch_total_bytes(int64_t batch_start, int64_t count) const noexcept
{
if (!is_single_chunk()) {
return 0; // Only optimized for single-chunk case
}
const auto local_idx = range_offsets_[0] + batch_start;
const auto* offsets = offsets_cache_[0];
const auto start = offsets[local_idx];
const auto end = offsets[local_idx + count];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential buffer overrun: Accessing offsets[local_idx + count] without bounds checking. If local_idx + count exceeds the valid offset array size, this causes undefined behavior.

Add bounds validation: if (local_idx + count >= offset_array_size) return 0; before the access.

return end - start;
}

// MULTI-CHUNK SUPPORT: Methods for iterating over data that spans multiple chunks.
// These enable efficient batch processing without per-row binary search overhead.

// Find which chunk contains the given global index.
// Returns: (chunk_index, row_within_chunk)
inline std::pair<size_t, int64_t> find_chunk(int64_t global_index) const noexcept
{
// offsets_ contains cumulative row counts: [chunk0_end, chunk0_end+chunk1_size, ...]
// So chunk i spans from (i==0 ? 0 : offsets_[i-1]) to offsets_[i]
//
// Binary search to find first cumulative offset > global_index
auto it = std::upper_bound(offsets_.begin(), offsets_.end(), global_index);
size_t chunk_idx = static_cast<size_t>(std::distance(offsets_.begin(), it));

// Calculate row within chunk
int64_t chunk_start = (chunk_idx == 0) ? 0 : offsets_[chunk_idx - 1];
int64_t row_in_chunk = global_index - chunk_start;

return {chunk_idx, row_in_chunk};
}

// Get the global row range [start, end) for a specific chunk.
inline std::pair<int64_t, int64_t> get_chunk_bounds(size_t chunk_idx) const noexcept
{
// offsets_ is cumulative: chunk i spans from (i==0 ? 0 : offsets_[i-1]) to offsets_[i]
int64_t chunk_start = (chunk_idx == 0) ? 0 : offsets_[chunk_idx - 1];
int64_t chunk_end = offsets_[chunk_idx];
return {chunk_start, chunk_end};
}

// Get contiguous string data for a specific chunk, starting at row_in_chunk.
// Similar to get_contiguous_strings() but for multi-chunk case.
inline contiguous_string_data get_chunk_strings(size_t chunk_idx, int64_t row_in_chunk) const noexcept
{
const auto local_idx = range_offsets_[chunk_idx] + row_in_chunk;
const auto* offsets = offsets_cache_[chunk_idx];
const auto base_offset = offsets[0];
return {buffer_cache_[chunk_idx], offsets, base_offset, local_idx};
}

// Get the number of chunks
inline size_t num_chunks() const noexcept
{
return offsets_.empty() ? 0 : offsets_.size();
}

private:
// Storage - kept as separate members for cache efficiency
nd::array vstack_holder_;
std::vector<int64_t> offsets_;
std::vector<int64_t> range_offsets_;
std::vector<const void*> holders_;

// Zero-copy buffer cache: raw pointers to chunk buffer data and string offsets.
// SAFETY: These pointers remain valid as long as vstack_holder_ keeps the source array alive.
// This eliminates shared_ptr atomic reference counting in get_range_data() hot path.
std::vector<const uint8_t*> buffer_cache_;
std::vector<const uint32_t*> offsets_cache_;

const void* dynamic_std_holder_ = nullptr;
const void* dynamic_icm_holder_ = nullptr;
bool is_valid_ = true;

void initialize(const nd::array& arr);
void initialize_single_range(const auto& range_adapter);
void initialize_single_range(const auto& range_adapter, const nd::array& source_arr);
void initialize_complex(const nd::array& arr);
bool try_initialize_range_arrays(const auto& vstacked, const nd::array& fallback);
bool try_initialize_range_arrays(const auto& vstacked);
void clear_range_data();
std::string_view get_range_data(int64_t index) const;
std::string_view get_dynamic_data(int64_t index) const;
Expand Down
Loading
Loading