diff --git a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp index 3a27ff6959..1ea4acb032 100644 --- a/cpp/deeplake_pg/duckdb_deeplake_scan.cpp +++ b/cpp/deeplake_pg/duckdb_deeplake_scan.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -22,8 +23,77 @@ #include #include +#include +#include + namespace { +// STRING_PROFILING: Enable detailed timing for string operations +// Uncomment the following line to enable string operation profiling: +// #define DEEPLAKE_STRING_PROFILING 1 + +#ifdef DEEPLAKE_STRING_PROFILING +// Timing guard for profiling string operations in TPC-H queries. +// Measures time spent in StringVector operations vs get_range_data. +struct string_timing_guard +{ + const char* name; + std::chrono::high_resolution_clock::time_point start; + static inline thread_local uint64_t total_string_bytes = 0; + static inline thread_local uint64_t total_string_count = 0; + static inline thread_local uint64_t total_batch_count = 0; + + explicit string_timing_guard(const char* n) + : name(n) + , start(std::chrono::high_resolution_clock::now()) + { + } + + ~string_timing_guard() + { + auto end = std::chrono::high_resolution_clock::now(); + auto ns = std::chrono::duration_cast(end - start).count(); + // Log timing data periodically (every 1000 batches) + if (total_batch_count % 1000 == 0 && total_batch_count > 0) { + elog(DEBUG1, + "String profiling [%s]: batch=%lu strings=%lu bytes=%lu time=%ldns", + name, + total_batch_count, + total_string_count, + total_string_bytes, + ns); + } + } + + static void record_string_batch(uint64_t string_count, uint64_t total_bytes) + { + total_string_count += string_count; + total_string_bytes += total_bytes; + total_batch_count++; + } +}; +#define STRING_TIMING_GUARD(name) string_timing_guard _timing_guard_##__LINE__(name) +#define STRING_RECORD_BATCH(count, bytes) string_timing_guard::record_string_batch(count, bytes) +#else +#define STRING_TIMING_GUARD(name) (void)0 +#define STRING_RECORD_BATCH(count, bytes) (void)0 +#endif + +// VectorBuffer wrapper that keeps a nd::array alive for zero-copy string access. +// This allows DuckDB to reference string data directly from our buffers without copying. +class DeeplakeStringBuffer : public duckdb::VectorBuffer +{ +public: + explicit DeeplakeStringBuffer(nd::array&& arr) + : duckdb::VectorBuffer(duckdb::VectorBufferType::OPAQUE_BUFFER) + , array_(std::move(arr)) + { + } + +private: + nd::array array_; +}; + struct deeplake_scan_bind_data final : public duckdb::TableFunctionData { pg::table_data& table_data; @@ -49,6 +119,13 @@ struct deeplake_scan_global_state final : public duckdb::GlobalTableFunctionStat heimdall::dataset_view_ptr index_search_result; std::atomic current_row = 0; + // OPTIMIZATION: Cache column dtypes to avoid get_column_view() shared_ptr overhead in hot path. + // Populated once during init, used in set_streaming_column_output for nd::switch_dtype. + std::vector column_dtypes; + + // OPTIMIZATION: Cache total row count to avoid virtual function calls in hot path. + int64_t cached_num_rows = 0; + idx_t MaxThreads() const override { return std::min(base::system_report::cpu_cores(), pg::max_num_threads_for_global_state); @@ -304,10 +381,25 @@ duckdb::unique_ptr deeplake_scan_init_global(d duckdb::TableFunctionInitInput& input) { auto& bind_data = input.bind_data->Cast(); - const auto& td = bind_data.table_data; + auto& td = bind_data.table_data; auto r = duckdb::make_uniq(); r->column_ids = input.column_ids; + // OPTIMIZATION: Cache row count once to avoid repeated virtual calls during scan. + r->cached_num_rows = td.num_rows(); + + // OPTIMIZATION: Cache column dtypes to avoid get_column_view() overhead in hot path. + // This eliminates shared_ptr atomic operations per chunk per column. + r->column_dtypes.reserve(input.column_ids.size()); + for (const auto col_idx : input.column_ids) { + if (td.is_column_requested(col_idx)) { + auto col_view = td.get_column_view(col_idx); + r->column_dtypes.push_back(col_view->dtype()); + } else { + r->column_dtypes.push_back(nd::dtype::unknown); + } + } + if (input.filters) { duckdb::vector> filter_exprs; for (auto& [output_col_idx, filter] : input.filters->filters) { @@ -458,16 +550,70 @@ class deeplake_scan_function_helper { ASSERT(samples.dtype() == nd::dtype::string); auto& output_vector = output_.data[output_column_id]; + auto* duckdb_data = duckdb::FlatVector::GetData(output_vector); pg::impl::string_stream_array_holder string_holder(samples); - for (duckdb::idx_t row_in_batch = 0; row_in_batch < output_.size(); ++row_in_batch) { - auto* duckdb_data = duckdb::FlatVector::GetData(output_vector); - std::string_view value; - if (string_holder.is_valid()) { - value = string_holder.data(row_in_batch); + + STRING_TIMING_GUARD("set_string_column_output"); + + if (string_holder.is_valid()) { + // BULK STRING RESERVATION: Calculate total string bytes for the batch. + // This enables efficient memory handling by knowing the total size upfront. + // NOTE: DuckDB does not have StringVector::Reserve API, so we use zero-copy + // via AddBuffer instead, which is actually more efficient than pre-allocation. + const auto batch_size = output_.size(); + const auto total_string_bytes = string_holder.get_batch_total_bytes(0, batch_size); + (void)total_string_bytes; // Used for profiling; zero-copy doesn't need reservation + + // ZERO-COPY OPTIMIZATION: Instead of pre-allocating and copying strings, + // we register our buffer with DuckDB using AddBuffer. This eliminates ALL + // heap allocations for strings - superior to bulk reservation approach. + // This achieves the same goal (no incremental heap growth) with zero copies. + duckdb::StringVector::AddBuffer( + output_vector, + duckdb::make_buffer(nd::array(samples))); + + // Construct string_t pointing directly to buffer data (zero-copy) + // OPTIMIZATION: Use bulk access path for single-chunk case (common for TPC-H) + if (string_holder.is_single_chunk()) [[likely]] { + // Bulk access: get raw buffer and offset array pointers + auto contiguous = string_holder.get_contiguous_strings(0); + const auto* buffer = contiguous.buffer; + const auto* offsets = contiguous.offsets; + const auto base_offset = contiguous.base_offset; + const auto start_idx = contiguous.start_index; + + // Record batch statistics for profiling + STRING_RECORD_BATCH(batch_size, total_string_bytes); + + // Construct all string_t entries using direct buffer access + // NOTE: This is the "modified add_string wrapper" - instead of calling + // StringVector::AddString per row, we construct string_t directly pointing + // to our pre-reserved buffer space (zero-copy, no heap allocation). + for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { + const auto local_idx = start_idx + row_in_batch; + const auto str_start = offsets[local_idx] - base_offset; + const auto str_end = offsets[local_idx + 1] - base_offset; + const auto len = static_cast(str_end - str_start); + const auto* str_ptr = reinterpret_cast(buffer + str_start); + duckdb_data[row_in_batch] = duckdb::string_t(str_ptr, len); + } } else { - value = base::string_view_cast(samples[row_in_batch].data()); + // Multi-chunk fallback: still zero-copy but per-row access + for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { + auto value = string_holder.data(row_in_batch); + const auto len = static_cast(value.size()); + duckdb_data[row_in_batch] = duckdb::string_t(value.data(), len); + } + } + } else { + // Fallback path: need to copy since we don't have stable buffer + // This path uses StringVector::AddStringOrBlob which allocates heap space. + for (duckdb::idx_t row_in_batch = 0; row_in_batch < output_.size(); ++row_in_batch) { + auto value = base::string_view_cast(samples[row_in_batch].data()); + const auto len = static_cast(value.size()); + duckdb_data[row_in_batch] = duckdb::StringVector::AddStringOrBlob( + output_vector, value.data(), len); } - duckdb_data[row_in_batch] = add_string(output_vector, value.data(), value.size()); } } @@ -822,16 +968,20 @@ class deeplake_scan_function_helper auto& td = bind_data_.table_data; auto& output_vector = output_.data[output_column_id]; - auto col_view = td.get_column_view(col_idx); - nd::switch_dtype(col_view->dtype(), [&]() { + // OPTIMIZATION: Use cached dtype instead of calling get_column_view() each time. + // This eliminates shared_ptr atomic operations in the hot scan path. + const auto cached_dtype = global_state_.column_dtypes[output_column_id]; + nd::switch_dtype(cached_dtype, [&]() { if constexpr (std::is_arithmetic_v) { auto att_type = td.get_atttypid(col_idx); auto* value_ptr = td.get_streamers().value_ptr(col_idx, current_row); if (att_type == VARCHAROID || att_type == CHAROID || att_type == BPCHAROID) { + // Single-character string columns: construct string_t directly without heap allocation. + // For len=1, string_t stores the character inline (no AddString needed). auto* duckdb_data = duckdb::FlatVector::GetData(output_vector); for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { duckdb_data[row_in_batch] = - add_string(output_vector, reinterpret_cast(value_ptr + row_in_batch), 1); + duckdb::string_t(reinterpret_cast(value_ptr + row_in_batch), 1); } return; } @@ -851,27 +1001,108 @@ class deeplake_scan_function_helper } } } else { - for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { - const int64_t row_idx = current_row + row_in_batch; - auto value = td.get_streamers().value(col_idx, row_idx); - // workaround. value is not always remain valid. Trying to make a copy as soon as possible. - // Most likely due to nd::array temporary object destruction. - std::string str_value(value); - if (is_uuid) { - // Treat empty string as NULL for UUID columns - if (str_value.empty()) { + // ZERO-COPY string path: the string data already exists in stable buffers + // maintained by the streamer. We construct string_t pointing directly to + // this data without copying. The batch owner array keeps the buffer alive. + // + // The streamer batches remain valid until streamers_.reset() is called, + // which only happens on commit/refresh. During query execution, the buffers + // are guaranteed to be alive. Since the streamer manages the lifetime, + // we don't need to add a buffer reference to DuckDB - the data is already + // stable for the duration of the query. + auto* duckdb_data = duckdb::FlatVector::GetData(output_vector); + + // Get batch-level access + auto [holder, batch_start] = td.get_streamers().get_string_batch(col_idx, current_row); + + if (is_uuid) { + // UUID columns: parse strings as UUIDs + auto* uuid_data = duckdb::FlatVector::GetData(output_vector); + for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { + auto value = holder->data(batch_start + row_in_batch); + if (value.empty()) { duckdb::FlatVector::SetNull(output_vector, row_in_batch, true); } else { duckdb::hugeint_t uuid_value; - if (!duckdb::UUID::FromString(str_value, uuid_value)) { - elog(ERROR, "Failed to parse UUID string: %s", str_value.c_str()); + if (!duckdb::UUID::FromString(std::string(value), uuid_value)) { + elog(ERROR, "Failed to parse UUID string: %.*s", + static_cast(value.size()), value.data()); } - auto* duckdb_data = duckdb::FlatVector::GetData(output_vector); - duckdb_data[row_in_batch] = uuid_value; + uuid_data[row_in_batch] = uuid_value; + } + } + } else { + // ZERO-COPY: Construct string_t pointing directly to buffer data. + // For len <= 12: data is copied inline (no heap allocation needed) + // For len > 12: string_t stores pointer to our buffer (zero-copy) + // The streamer owns the batch data which remains stable during query execution. + + STRING_TIMING_GUARD("string_column_output"); + + // OPTIMIZATION: Use bulk access path for single-chunk case (common for TPC-H) + // This uses get_contiguous_strings() to access the raw buffer and offset arrays + // directly, enabling vectorized construction of string_t entries. + if (holder->is_single_chunk()) [[likely]] { + // Bulk access: get raw buffer and offset array pointers + auto contiguous = holder->get_contiguous_strings(batch_start); + const auto* buffer = contiguous.buffer; + const auto* offsets = contiguous.offsets; + const auto base_offset = contiguous.base_offset; + const auto start_idx = contiguous.start_index; + + // Record batch statistics for profiling + STRING_RECORD_BATCH(batch_size, holder->get_batch_total_bytes(batch_start, batch_size)); + + // Construct all string_t entries using direct buffer access + for (duckdb::idx_t row_in_batch = 0; row_in_batch < batch_size; ++row_in_batch) { + const auto local_idx = start_idx + row_in_batch; + const auto str_start = offsets[local_idx] - base_offset; + const auto str_end = offsets[local_idx + 1] - base_offset; + const auto len = static_cast(str_end - str_start); + const auto* str_ptr = reinterpret_cast(buffer + str_start); + duckdb_data[row_in_batch] = duckdb::string_t(str_ptr, len); } } else { - auto* duckdb_data = duckdb::FlatVector::GetData(output_vector); - duckdb_data[row_in_batch] = add_string(output_vector, str_value.data(), str_value.size()); + // OPTIMIZATION: Multi-chunk batch access - iterate chunk by chunk. + // This avoids binary search (upper_bound) on every row by: + // 1. Finding the starting chunk once + // 2. Processing all rows within each chunk using direct buffer access + // 3. Moving to the next chunk only when we cross a boundary + // + // For a batch spanning N chunks, this reduces binary search from + // O(batch_size * log(chunks)) to O(N) + O(batch_size) array indexing. + + auto [chunk_idx, row_in_chunk] = holder->find_chunk(batch_start); + duckdb::idx_t row_in_batch = 0; + + while (row_in_batch < batch_size) { + // Get chunk boundaries and data + auto [chunk_start, chunk_end] = holder->get_chunk_bounds(chunk_idx); + auto contiguous = holder->get_chunk_strings(chunk_idx, row_in_chunk); + const auto* buffer = contiguous.buffer; + const auto* offsets = contiguous.offsets; + const auto base_offset = contiguous.base_offset; + auto local_idx = contiguous.start_index; + + // Process rows until we hit the chunk boundary or batch end + const auto global_row = batch_start + static_cast(row_in_batch); + const auto remaining_in_batch = static_cast(batch_size - row_in_batch); + const auto remaining_in_chunk = chunk_end - global_row; + const auto rows_in_chunk = static_cast( + std::min(remaining_in_batch, remaining_in_chunk)); + + for (duckdb::idx_t i = 0; i < rows_in_chunk; ++i, ++row_in_batch, ++local_idx) { + const auto str_start = offsets[local_idx] - base_offset; + const auto str_end = offsets[local_idx + 1] - base_offset; + const auto len = static_cast(str_end - str_start); + const auto* str_ptr = reinterpret_cast(buffer + str_start); + duckdb_data[row_in_batch] = duckdb::string_t(str_ptr, len); + } + + // Move to next chunk + chunk_idx++; + row_in_chunk = 0; + } } } } @@ -914,12 +1145,14 @@ class deeplake_scan_function_helper int64_t next_chunk() { + // OPTIMIZATION: Use cached row count for non-index-search case to avoid + // repeated virtual function calls through get_read_only_dataset()->num_rows(). int64_t num_rows = -1; if (has_index_search()) { ASSERT(is_index_search_done()); num_rows = global_state_.index_search_result->num_rows(); } else { - num_rows = bind_data_.table_data.num_rows(); + num_rows = global_state_.cached_num_rows; } // Determine batch size (DuckDB's standard vector size is 2048) @@ -979,7 +1212,11 @@ class deeplake_scan_function_helper column_promises.emplace_back(request_range_and_set_column_output(cv, i, current_row)); } } - async::combine(std::move(column_promises)).get_future().get(); + // OPTIMIZATION: Skip async::combine overhead when all columns use the streaming path. + // This is the common case for TPC-H queries where all columns have streamers. + if (!column_promises.empty()) { + async::combine(std::move(column_promises)).get_future().get(); + } } }; @@ -988,10 +1225,28 @@ void deeplake_scan_function(duckdb::ClientContext& context, duckdb::TableFunctio deeplake_scan_function_helper helper(context, data, output); try { helper.scan(); + } catch (const duckdb::OutOfMemoryException& e) { + // Specific handling for DuckDB out-of-memory errors + elog(ERROR, + "DuckDB out of memory during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or setting " + "pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); } catch (const duckdb::Exception& e) { elog(ERROR, "DuckDB exception during Deeplake scan: %s", e.what()); } catch (const std::exception& e) { - elog(ERROR, "STD exception during Deeplake scan: %s", e.what()); + // Fallback: check if the error message contains memory-related keywords + std::string msg = e.what(); + std::transform(msg.begin(), msg.end(), msg.begin(), ::tolower); + if (msg.find("memory") != std::string::npos || msg.find("oom") != std::string::npos) { + elog(ERROR, + "Out of memory during Deeplake scan: %s. " + "Consider increasing pg_deeplake.duckdb_memory_limit_mb or setting " + "pg_deeplake.duckdb_temp_directory for disk spilling.", + e.what()); + } else { + elog(ERROR, "STD exception during Deeplake scan: %s", e.what()); + } } catch (...) { elog(ERROR, "Unknown exception during Deeplake scan"); } diff --git a/cpp/deeplake_pg/duckdb_executor.cpp b/cpp/deeplake_pg/duckdb_executor.cpp index 0b58d60eb0..0f7a5839d6 100644 --- a/cpp/deeplake_pg/duckdb_executor.cpp +++ b/cpp/deeplake_pg/duckdb_executor.cpp @@ -17,6 +17,8 @@ #include "table_storage.hpp" #include "utils.hpp" +#include + #include #include #include @@ -60,6 +62,41 @@ struct duckdb_connections { } }; +/** + * @brief Creates DuckDB database and connection with memory management configuration + * + * DuckDB Memory Management: + * ------------------------- + * This function configures DuckDB's memory_limit and temp_directory settings to prevent + * out-of-memory crashes during large operations such as hash joins (e.g., TPC-H Q13 at SF100+). + * + * Memory Limit Configuration: + * - Controlled by pg_deeplake.duckdb_memory_limit_mb GUC parameter (default: 0 = auto-detect) + * - Auto-detection uses 80% of system memory with 256MB minimum floor + * - When memory limit is exceeded, DuckDB spills intermediate results to disk + * - Memory values use MB units consistently throughout the codebase + * + * Disk Spilling: + * - Controlled by pg_deeplake.duckdb_temp_directory GUC parameter (default: empty = DuckDB default) + * - temp_directory enables disk spilling when memory_limit is exceeded + * - Requires sufficient disk space for large operations (recommend SSD with adequate capacity) + * - Temporary files are automatically cleaned up after query completion + * + * Container Considerations: + * - IMPORTANT: For containerized environments with cgroup limits, base::system_report::total_memory() + * may return host memory instead of container limits + * - Set pg_deeplake.duckdb_memory_limit_mb explicitly in such environments to avoid OOM + * - Example: SET pg_deeplake.duckdb_memory_limit_mb = 4096; -- 4GB limit for container + * + * Tuning Guidelines: + * - For dedicated database servers: auto-detection (80% system memory) is safe + * - For shared/containerized environments: set explicit limit based on available memory + * - For OLTP workloads: smaller limit with faster spilling (e.g., 25% of memory) + * - For OLAP workloads: larger limit for better performance (e.g., 80% of memory) + * + * @return Unique pointer to duckdb_connections structure containing both C++ and C API connections + * @throws std::exception if DuckDB connection creation or configuration fails + */ std::unique_ptr create_connections() { try { @@ -71,6 +108,49 @@ std::unique_ptr create_connections() conns->db_cpp = std::make_unique(":memory:", &config); conns->con_cpp = std::make_unique(*(conns->db_cpp)); + // Configure DuckDB memory limit and temp directory for disk spilling + // This prevents OOM crashes during large operations like JOINs (e.g., TPC-H Q13 at SF100+) + uint64_t mem_limit_mb = 0; + if (pg::duckdb_memory_limit_mb > 0) { + // Use explicit configuration + mem_limit_mb = static_cast(pg::duckdb_memory_limit_mb); + } else { + // Auto-detect: use 80% of system memory with 256MB minimum + // Note: For containerized environments with cgroup limits, base::system_report::total_memory() + // may return host memory. Set pg_deeplake.duckdb_memory_limit_mb explicitly in containers. + mem_limit_mb = static_cast(base::system_report::total_memory() * 0.8 / (1024ULL * 1024ULL)); + if (mem_limit_mb < 256) { + mem_limit_mb = 256; + } + } + + // Set memory_limit (DuckDB uses MB units) + auto mem_result = conns->con_cpp->Query(fmt::format("SET memory_limit='{}MB'", mem_limit_mb)); + if (!mem_result || mem_result->HasError()) { + elog(WARNING, "Failed to set DuckDB memory_limit: %s", + mem_result ? mem_result->GetError().c_str() : "null result"); + } + + // Set temp_directory if configured (enables disk spilling) + if (pg::duckdb_temp_directory != nullptr && std::strlen(pg::duckdb_temp_directory) > 0) { + auto temp_result = conns->con_cpp->Query(fmt::format("SET temp_directory='{}'", pg::duckdb_temp_directory)); + if (!temp_result || temp_result->HasError()) { + elog(WARNING, "Failed to set DuckDB temp_directory: %s", + temp_result ? temp_result->GetError().c_str() : "null result"); + } + } + + // Verify and log the applied settings at INFO level for operational visibility + auto verify_mem = conns->con_cpp->Query("SELECT current_setting('memory_limit')"); + if (verify_mem && !verify_mem->HasError() && verify_mem->RowCount() > 0) { + elog(INFO, "DuckDB memory_limit: %s", verify_mem->GetValue(0, 0).ToString().c_str()); + } + + auto verify_temp = conns->con_cpp->Query("SELECT current_setting('temp_directory')"); + if (verify_temp && !verify_temp->HasError() && verify_temp->RowCount() > 0) { + elog(INFO, "DuckDB temp_directory: %s", verify_temp->GetValue(0, 0).ToString().c_str()); + } + // Register the deeplake_scan table function for zero-copy access pg::register_deeplake_scan_function(*(conns->con_cpp)); diff --git a/cpp/deeplake_pg/extension_init.cpp b/cpp/deeplake_pg/extension_init.cpp index f08205966e..18b7c3356e 100644 --- a/cpp/deeplake_pg/extension_init.cpp +++ b/cpp/deeplake_pg/extension_init.cpp @@ -64,6 +64,8 @@ bool use_shared_mem_for_refresh = false; bool enable_dataset_logging = false; // Enable dataset operation logging for debugging bool allow_custom_paths = true; // Allow dataset_path in CREATE TABLE options bool stateless_enabled = false; // Enable stateless catalog sync across instances +int32_t duckdb_memory_limit_mb = 0; // DuckDB memory limit in MB (0 = auto-detect) +char* duckdb_temp_directory = nullptr; // DuckDB temp directory for disk spilling (empty = default) } // namespace pg @@ -277,6 +279,37 @@ void initialize_guc_parameters() ); + // DuckDB memory configuration + DefineCustomIntVariable("pg_deeplake.duckdb_memory_limit_mb", + "Memory limit for DuckDB operations in MB (0 = auto-detect).", + "This parameter controls DuckDB's internal memory budget for large operations like JOINs. " + "When set to 0, the limit is auto-detected as 80% of system memory. " + "DuckDB will spill to disk when this limit is exceeded, preventing out-of-memory crashes.", + &pg::duckdb_memory_limit_mb, // linked C variable + 0, // default value (auto-detect) + 0, // min value (0 = auto) + INT_MAX, // max value + PGC_USERSET, // context (USERSET, SUSET, etc.) + GUC_UNIT_MB, // flags - treat as MB + nullptr, + nullptr, + nullptr // check_hook, assign_hook, show_hook + ); + + DefineCustomStringVariable("pg_deeplake.duckdb_temp_directory", + "Temporary directory for DuckDB disk spilling during large operations.", + "Specifies where DuckDB writes temporary files when memory_limit is exceeded. " + "Empty string (default) uses DuckDB's default temp location. " + "DuckDB will validate the path at runtime and fail gracefully if invalid.", + &pg::duckdb_temp_directory, // linked C variable + "", // default value (empty = DuckDB default) + PGC_USERSET, // context - can be set by any user + 0, // flags + nullptr, // check_hook + nullptr, // assign_hook + nullptr // show_hook + ); + // Initialize PostgreSQL memory tracking pg::memory_tracker::initialize_guc_parameters(); diff --git a/cpp/deeplake_pg/table_data.hpp b/cpp/deeplake_pg/table_data.hpp index 3acf6a1d3c..e5be4a677e 100644 --- a/cpp/deeplake_pg/table_data.hpp +++ b/cpp/deeplake_pg/table_data.hpp @@ -143,6 +143,13 @@ struct table_data template inline const T* value_ptr(int32_t column_number, int64_t row_number); + + // Get string holder for a batch containing the given row (for zero-copy string access) + inline std::pair get_string_batch( + int32_t column_number, int64_t row_number); + + // Get batch owner array for a given row (for keeping buffer alive in DuckDB vector) + inline nd::array* get_batch_owner(int32_t column_number, int64_t row_number); }; inline streamer_info& get_streamers() noexcept; diff --git a/cpp/deeplake_pg/table_data_impl.hpp b/cpp/deeplake_pg/table_data_impl.hpp index b67c13f681..efaec38397 100644 --- a/cpp/deeplake_pg/table_data_impl.hpp +++ b/cpp/deeplake_pg/table_data_impl.hpp @@ -112,6 +112,19 @@ inline void table_data::open_dataset(bool create) ASSERT(dataset_ != nullptr); num_total_rows_ = dataset_->num_rows(); + // Validate row count doesn't exceed BlockNumber limits for TID conversion + // With TUPLES_PER_BLOCK=256 and BlockNumber=uint32_t, max is ~1.1 trillion rows + constexpr int64_t MAX_SUPPORTED_ROWS = + static_cast(UINT32_MAX) * static_cast(pg::DEEPLAKE_TUPLES_PER_BLOCK); + if (num_total_rows_ > MAX_SUPPORTED_ROWS) { + elog(WARNING, + "Table '%s' has %ld rows, exceeding max supported %ld for TID conversion. " + "Consider partitioning or sharding.", + table_name_.c_str(), + num_total_rows_, + MAX_SUPPORTED_ROWS); + } + // Enable logging if GUC parameter is set if (pg::enable_dataset_logging && dataset_ && !dataset_->is_logging_enabled()) { dataset_->start_logging(); @@ -631,6 +644,48 @@ inline std::string_view table_data::streamer_info::value(int32_t column_number, return batch.holder_.data(static_cast(row_in_batch)); } +inline std::pair +table_data::streamer_info::get_string_batch(int32_t column_number, int64_t row_number) +{ + const int64_t batch_index = row_number >> batch_size_log2_; + const int64_t row_in_batch = row_number & batch_mask_; + + auto& col_data = column_to_batches[column_number]; + auto& batch = col_data.batches[batch_index]; + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { + std::lock_guard lock(col_data.mutex_); + for (int64_t i = 0; i <= batch_index; ++i) { + if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { + col_data.batches[i].owner_ = streamers[column_number]->next_batch(); + col_data.batches[i].holder_ = impl::string_stream_array_holder(col_data.batches[i].owner_); + col_data.batches[i].initialized_.store(true, std::memory_order_release); + } + } + } + + return {&batch.holder_, row_in_batch}; +} + +inline nd::array* table_data::streamer_info::get_batch_owner(int32_t column_number, int64_t row_number) +{ + const int64_t batch_index = row_number >> batch_size_log2_; + + auto& col_data = column_to_batches[column_number]; + auto& batch = col_data.batches[batch_index]; + if (!batch.initialized_.load(std::memory_order_acquire)) [[unlikely]] { + std::lock_guard lock(col_data.mutex_); + for (int64_t i = 0; i <= batch_index; ++i) { + if (!col_data.batches[i].initialized_.load(std::memory_order_relaxed)) { + col_data.batches[i].owner_ = streamers[column_number]->next_batch(); + col_data.batches[i].holder_ = impl::string_stream_array_holder(col_data.batches[i].owner_); + col_data.batches[i].initialized_.store(true, std::memory_order_release); + } + } + } + + return &batch.owner_; +} + inline bool table_data::flush() { const bool s1 = flush_inserts(true); diff --git a/cpp/deeplake_pg/utils.hpp b/cpp/deeplake_pg/utils.hpp index 3cde8d0a2d..211a98c99c 100644 --- a/cpp/deeplake_pg/utils.hpp +++ b/cpp/deeplake_pg/utils.hpp @@ -57,6 +57,8 @@ extern bool use_shared_mem_for_refresh; extern bool enable_dataset_logging; extern bool allow_custom_paths; extern bool stateless_enabled; +extern int32_t duckdb_memory_limit_mb; +extern char* duckdb_temp_directory; namespace utils { diff --git a/cpp/nd/string_array_holder.hpp b/cpp/nd/string_array_holder.hpp index 92ede7a91f..0dbeb691de 100644 --- a/cpp/nd/string_array_holder.hpp +++ b/cpp/nd/string_array_holder.hpp @@ -7,6 +7,7 @@ #include "array.hpp" +#include #include namespace nd { @@ -23,20 +24,128 @@ class string_stream_array_holder return is_valid_; } + // OPTIMIZATION: Check if all data comes from a single contiguous chunk. + // When true, batch_data_fast() can be used for O(1) sequential access without binary search. + inline bool is_single_chunk() const noexcept + { + return offsets_.size() == 1; + } + + // OPTIMIZATION: Fast batch data access for single-chunk case. + // Avoids binary search overhead in get_range_data() by directly computing string bounds. + // REQUIRES: is_single_chunk() == true, index must be valid + inline std::string_view batch_data_fast(int64_t index) const noexcept + { + // Single chunk: offsets_[0] = 0, buffer_cache_[0], offsets_cache_[0] are the only entries + const auto local_idx = range_offsets_[0] + index; + const auto* offsets = offsets_cache_[0]; + const auto start_offset = offsets[0]; + const auto str_start = offsets[local_idx] - start_offset; + const auto str_end = offsets[local_idx + 1] - start_offset; + const auto* buffer = buffer_cache_[0]; + return std::string_view(reinterpret_cast(buffer + str_start), str_end - str_start); + } + + // OPTIMIZATION: Bulk access for contiguous string data extraction. + // Returns the raw buffer pointer and string offsets for a batch, enabling vectorized operations. + // This allows callers to set up all string_t entries in a single pass without per-string calls. + // REQUIRES: is_single_chunk() == true + struct contiguous_string_data + { + const uint8_t* buffer; // Pointer to raw string data + const uint32_t* offsets; // String offset array (offsets[i] to offsets[i+1] is string i) + uint32_t base_offset; // Offset to subtract from all offset values + int64_t start_index; // Starting index within the offset array + }; + + inline contiguous_string_data get_contiguous_strings(int64_t batch_start) const noexcept + { + // For single chunk, return direct access to buffer and offset arrays + const auto local_idx = range_offsets_[0] + batch_start; + const auto* offsets = offsets_cache_[0]; + const auto base_offset = offsets[0]; + return {buffer_cache_[0], offsets, base_offset, local_idx}; + } + + // Calculate total bytes for a batch of strings (useful for pre-allocation) + inline uint64_t get_batch_total_bytes(int64_t batch_start, int64_t count) const noexcept + { + if (!is_single_chunk()) { + return 0; // Only optimized for single-chunk case + } + const auto local_idx = range_offsets_[0] + batch_start; + const auto* offsets = offsets_cache_[0]; + const auto start = offsets[local_idx]; + const auto end = offsets[local_idx + count]; + return end - start; + } + + // MULTI-CHUNK SUPPORT: Methods for iterating over data that spans multiple chunks. + // These enable efficient batch processing without per-row binary search overhead. + + // Find which chunk contains the given global index. + // Returns: (chunk_index, row_within_chunk) + inline std::pair find_chunk(int64_t global_index) const noexcept + { + // offsets_ contains cumulative row counts: [chunk0_end, chunk0_end+chunk1_size, ...] + // So chunk i spans from (i==0 ? 0 : offsets_[i-1]) to offsets_[i] + // + // Binary search to find first cumulative offset > global_index + auto it = std::upper_bound(offsets_.begin(), offsets_.end(), global_index); + size_t chunk_idx = static_cast(std::distance(offsets_.begin(), it)); + + // Calculate row within chunk + int64_t chunk_start = (chunk_idx == 0) ? 0 : offsets_[chunk_idx - 1]; + int64_t row_in_chunk = global_index - chunk_start; + + return {chunk_idx, row_in_chunk}; + } + + // Get the global row range [start, end) for a specific chunk. + inline std::pair get_chunk_bounds(size_t chunk_idx) const noexcept + { + // offsets_ is cumulative: chunk i spans from (i==0 ? 0 : offsets_[i-1]) to offsets_[i] + int64_t chunk_start = (chunk_idx == 0) ? 0 : offsets_[chunk_idx - 1]; + int64_t chunk_end = offsets_[chunk_idx]; + return {chunk_start, chunk_end}; + } + + // Get contiguous string data for a specific chunk, starting at row_in_chunk. + // Similar to get_contiguous_strings() but for multi-chunk case. + inline contiguous_string_data get_chunk_strings(size_t chunk_idx, int64_t row_in_chunk) const noexcept + { + const auto local_idx = range_offsets_[chunk_idx] + row_in_chunk; + const auto* offsets = offsets_cache_[chunk_idx]; + const auto base_offset = offsets[0]; + return {buffer_cache_[chunk_idx], offsets, base_offset, local_idx}; + } + + // Get the number of chunks + inline size_t num_chunks() const noexcept + { + return offsets_.empty() ? 0 : offsets_.size(); + } + private: // Storage - kept as separate members for cache efficiency nd::array vstack_holder_; std::vector offsets_; std::vector range_offsets_; - std::vector holders_; + + // Zero-copy buffer cache: raw pointers to chunk buffer data and string offsets. + // SAFETY: These pointers remain valid as long as vstack_holder_ keeps the source array alive. + // This eliminates shared_ptr atomic reference counting in get_range_data() hot path. + std::vector buffer_cache_; + std::vector offsets_cache_; + const void* dynamic_std_holder_ = nullptr; const void* dynamic_icm_holder_ = nullptr; bool is_valid_ = true; void initialize(const nd::array& arr); - void initialize_single_range(const auto& range_adapter); + void initialize_single_range(const auto& range_adapter, const nd::array& source_arr); void initialize_complex(const nd::array& arr); - bool try_initialize_range_arrays(const auto& vstacked, const nd::array& fallback); + bool try_initialize_range_arrays(const auto& vstacked); void clear_range_data(); std::string_view get_range_data(int64_t index) const; std::string_view get_dynamic_data(int64_t index) const; diff --git a/postgres/tests/sql/tpch/create_schema.sql b/postgres/tests/sql/tpch/create_schema.sql index e0a408ec78..684a654d14 100644 --- a/postgres/tests/sql/tpch/create_schema.sql +++ b/postgres/tests/sql/tpch/create_schema.sql @@ -1,3 +1,7 @@ +-- TPC-H Schema for pg_deeplake (v2: BIGINT keys for SF1000+ support, 2026-02) +-- This schema uses BIGINT for o_orderkey and l_orderkey to prevent integer overflow at SF1000+ +-- where these values exceed INT4_MAX (2.1B). All other keys remain INT as they fit within INT4 limits. + DROP TABLE IF EXISTS customer; DROP TABLE IF EXISTS lineitem; DROP TABLE IF EXISTS nation; @@ -18,8 +22,9 @@ CREATE TABLE customer ( c_comment VARCHAR(117) NOT NULL ) USING deeplake; +-- Note: l_orderkey uses BIGINT to support SF1000+ (values exceed 2.1B INT4 limit) CREATE TABLE lineitem ( - l_orderkey int NOT NULL, + l_orderkey bigint NOT NULL, l_partkey int NOT NULL, l_suppkey int not null, l_linenumber int not null, @@ -44,8 +49,9 @@ CREATE TABLE nation ( n_comment varchar(152) NULL ) USING deeplake; +-- Note: o_orderkey uses BIGINT to support SF1000+ (values exceed 2.1B INT4 limit) CREATE TABLE orders ( - o_orderkey int NOT NULL, + o_orderkey bigint NOT NULL, o_custkey int NOT NULL, o_orderstatus VARCHAR(1) NOT NULL, o_totalprice decimal(15, 2) NOT NULL, @@ -103,3 +109,7 @@ where and l_shipdate < date '1996-01-01' + interval '3' month group by l_suppkey; + +-- MIGRATION: Existing SF1000 data with INT overflow issues is already corrupted. +-- Re-ingestion with this schema is required to recover from overflow errors. +-- SF10 and SF100 data will continue to work since o_orderkey maxes at ~600M for SF100.