diff --git a/DESCRIPTION b/DESCRIPTION index c70e9d27f..f0e516db7 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -1,6 +1,6 @@ Package: duckdb Title: DBI Package for the DuckDB Database Management System -Version: 1.1.3.9032 +Version: 1.1.3.9033 Authors@R: c( person("Hannes", "Mühleisen", , "hannes@cwi.nl", role = "aut", comment = c(ORCID = "0000-0001-8552-0029")), diff --git a/NEWS.md b/NEWS.md index 66c580fa8..452fe023a 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,22 @@ +# duckdb 1.1.3.9033 + +## vendor + +- Update vendored sources to duckdb/duckdb@adc6f607a71b87da2d0a7550e90db623e9bea637 (#959). + +- Update vendored sources to duckdb/duckdb@13ba13c121acfb3f4c48c16337297ac705779c19 (#958). + +- Update vendored sources to duckdb/duckdb@45462bcffd761b7d797cc1ab660930be62c110cb (#957). + +- Update vendored sources to duckdb/duckdb@a0a828b712f538a64263dad251d20a5f91f83a80 (#956). + +- Update vendored sources to duckdb/duckdb@2082b55f89fe6e810f982c57dceecbee5ecd40fa (#955). + +- Update vendored sources to duckdb/duckdb@7ee114cea8a43d9cdd0f0442cbde05c63a65a9c3 (#954). + + # duckdb 1.1.3.9032 ## vendor diff --git a/src/duckdb/src/common/exception.cpp b/src/duckdb/src/common/exception.cpp index 9b30eedb5..991f929f2 100644 --- a/src/duckdb/src/common/exception.cpp +++ b/src/duckdb/src/common/exception.cpp @@ -66,7 +66,6 @@ bool Exception::InvalidatesTransaction(ExceptionType exception_type) { bool Exception::InvalidatesDatabase(ExceptionType exception_type) { switch (exception_type) { - case ExceptionType::INTERNAL: case ExceptionType::FATAL: return true; default: diff --git a/src/duckdb/src/common/stacktrace.cpp b/src/duckdb/src/common/stacktrace.cpp index ab73be9f4..7a42b35cf 100644 --- a/src/duckdb/src/common/stacktrace.cpp +++ b/src/duckdb/src/common/stacktrace.cpp @@ -21,7 +21,7 @@ static string UnmangleSymbol(string symbol) { } } for (idx_t i = mangle_start; i < symbol.size(); i++) { - if (StringUtil::CharacterIsSpace(symbol[i])) { + if (StringUtil::CharacterIsSpace(symbol[i]) || symbol[i] == ')' || symbol[i] == '+') { mangle_end = i; break; } @@ -44,6 +44,45 @@ static string UnmangleSymbol(string symbol) { return result; } +static string CleanupStackTrace(string symbol) { +#ifdef __APPLE__ + // structure of frame pointers is [depth] [library] [pointer] [symbol] + // we are only interested in [depth] and [symbol] + + // find the depth + idx_t start; + for (start = 0; start < symbol.size(); start++) { + if (!StringUtil::CharacterIsDigit(symbol[start])) { + break; + } + } + + // now scan forward until we find the frame pointer + idx_t frame_end = symbol.size(); + for (idx_t i = start; i + 1 < symbol.size(); ++i) { + if (symbol[i] == '0' && symbol[i + 1] == 'x') { + idx_t k; + for (k = i + 2; k < symbol.size(); ++k) { + if (!StringUtil::CharacterIsHex(symbol[k])) { + break; + } + } + frame_end = k; + break; + } + } + static constexpr idx_t STACK_TRACE_INDENTATION = 8; + if (frame_end == symbol.size() || start >= STACK_TRACE_INDENTATION) { + // frame pointer not found - just preserve the original frame + return symbol; + } + idx_t space_count = STACK_TRACE_INDENTATION - start; + return symbol.substr(0, start) + string(space_count, ' ') + symbol.substr(frame_end, symbol.size() - frame_end); +#else + return symbol; +#endif +} + string StackTrace::GetStacktracePointers(idx_t max_depth) { string result; auto callstack = unique_ptr(new void *[max_depth]); @@ -68,7 +107,7 @@ string StackTrace::ResolveStacktraceSymbols(const string &pointers) { string result; char **strs = backtrace_symbols(callstack.get(), NumericCast(frame_count)); for (idx_t i = 0; i < frame_count; i++) { - result += UnmangleSymbol(strs[i]); + result += CleanupStackTrace(UnmangleSymbol(strs[i])); result += "\n"; } free(reinterpret_cast(strs)); diff --git a/src/duckdb/src/common/types.cpp b/src/duckdb/src/common/types.cpp index 52002ded4..4beade0ef 100644 --- a/src/duckdb/src/common/types.cpp +++ b/src/duckdb/src/common/types.cpp @@ -988,6 +988,72 @@ static bool CombineUnequalTypes(const LogicalType &left, const LogicalType &righ return false; } +template +static bool CombineStructTypes(const LogicalType &left, const LogicalType &right, LogicalType &result) { + auto &left_children = StructType::GetChildTypes(left); + auto &right_children = StructType::GetChildTypes(right); + + auto left_unnamed = StructType::IsUnnamed(left); + auto is_unnamed = left_unnamed || StructType::IsUnnamed(right); + child_list_t child_types; + + // At least one side is unnamed, so we attempt positional casting. + if (is_unnamed) { + if (left_children.size() != right_children.size()) { + // We can't cast, or create the super-set. + return false; + } + + for (idx_t i = 0; i < left_children.size(); i++) { + LogicalType child_type; + if (!OP::Operation(left_children[i].second, right_children[i].second, child_type)) { + return false; + } + auto &child_name = left_unnamed ? right_children[i].first : left_children[i].first; + child_types.emplace_back(child_name, std::move(child_type)); + } + result = LogicalType::STRUCT(child_types); + return true; + } + + // Create a super-set of the STRUCT fields. + // First, create a name->index map of the right children. + case_insensitive_map_t right_children_map; + for (idx_t i = 0; i < right_children.size(); i++) { + auto &name = right_children[i].first; + right_children_map[name] = i; + } + + for (idx_t i = 0; i < left_children.size(); i++) { + auto &left_child = left_children[i]; + auto right_child_it = right_children_map.find(left_child.first); + + if (right_child_it == right_children_map.end()) { + // We can directly put the left child. + child_types.emplace_back(left_child.first, left_child.second); + continue; + } + + // We need to recurse to ensure the children have a maximum logical type. + LogicalType child_type; + auto &right_child = right_children[right_child_it->second]; + if (!OP::Operation(left_child.second, right_child.second, child_type)) { + return false; + } + child_types.emplace_back(left_child.first, std::move(child_type)); + right_children_map.erase(right_child_it); + } + + // Add all remaining right children. + for (const auto &right_child_it : right_children_map) { + auto &right_child = right_children[right_child_it.second]; + child_types.emplace_back(right_child.first, right_child.second); + } + + result = LogicalType::STRUCT(child_types); + return true; +} + template static bool CombineEqualTypes(const LogicalType &left, const LogicalType &right, LogicalType &result) { // Since both left and right are equal we get the left type as our type_id for checks @@ -1059,31 +1125,7 @@ static bool CombineEqualTypes(const LogicalType &left, const LogicalType &right, return true; } case LogicalTypeId::STRUCT: { - // struct: perform recursively on each child - auto &left_child_types = StructType::GetChildTypes(left); - auto &right_child_types = StructType::GetChildTypes(right); - bool left_unnamed = StructType::IsUnnamed(left); - auto any_unnamed = left_unnamed || StructType::IsUnnamed(right); - if (left_child_types.size() != right_child_types.size()) { - // child types are not of equal size, we can't cast - // return false - return false; - } - child_list_t child_types; - for (idx_t i = 0; i < left_child_types.size(); i++) { - LogicalType child_type; - // Child names must be in the same order OR either one of the structs must be unnamed - if (!any_unnamed && !StringUtil::CIEquals(left_child_types[i].first, right_child_types[i].first)) { - return false; - } - if (!OP::Operation(left_child_types[i].second, right_child_types[i].second, child_type)) { - return false; - } - auto &child_name = left_unnamed ? right_child_types[i].first : left_child_types[i].first; - child_types.emplace_back(child_name, std::move(child_type)); - } - result = LogicalType::STRUCT(child_types); - return true; + return CombineStructTypes(left, right, result); } case LogicalTypeId::UNION: { auto left_member_count = UnionType::GetMemberCount(left); diff --git a/src/duckdb/src/common/types/vector.cpp b/src/duckdb/src/common/types/vector.cpp index 325246ad9..722b8b699 100644 --- a/src/duckdb/src/common/types/vector.cpp +++ b/src/duckdb/src/common/types/vector.cpp @@ -54,10 +54,10 @@ UnifiedVectorFormat &UnifiedVectorFormat::operator=(UnifiedVectorFormat &&other) return *this; } -Vector::Vector(LogicalType type_p, bool create_data, bool zero_data, idx_t capacity) +Vector::Vector(LogicalType type_p, bool create_data, bool initialize_to_zero, idx_t capacity) : vector_type(VectorType::FLAT_VECTOR), type(std::move(type_p)), data(nullptr), validity(capacity) { if (create_data) { - Initialize(zero_data, capacity); + Initialize(initialize_to_zero, capacity); } } @@ -306,7 +306,7 @@ void Vector::Slice(const SelectionVector &sel, idx_t count, SelCache &cache) { } } -void Vector::Initialize(bool zero_data, idx_t capacity) { +void Vector::Initialize(bool initialize_to_zero, idx_t capacity) { auxiliary.reset(); validity.Reset(); auto &type = GetType(); @@ -325,7 +325,7 @@ void Vector::Initialize(bool zero_data, idx_t capacity) { if (type_size > 0) { buffer = VectorBuffer::CreateStandardVector(type, capacity); data = buffer->GetData(); - if (zero_data) { + if (initialize_to_zero) { memset(data, 0, capacity * type_size); } } @@ -1374,10 +1374,10 @@ void Vector::Deserialize(Deserializer &deserializer, idx_t count) { } void Vector::SetVectorType(VectorType vector_type_p) { - this->vector_type = vector_type_p; + vector_type = vector_type_p; auto physical_type = GetType().InternalType(); - if (TypeIsConstantSize(physical_type) && - (GetVectorType() == VectorType::CONSTANT_VECTOR || GetVectorType() == VectorType::FLAT_VECTOR)) { + auto flat_or_const = GetVectorType() == VectorType::CONSTANT_VECTOR || GetVectorType() == VectorType::FLAT_VECTOR; + if (TypeIsConstantSize(physical_type) && flat_or_const) { auxiliary.reset(); } if (vector_type == VectorType::CONSTANT_VECTOR && physical_type == PhysicalType::STRUCT) { @@ -1782,23 +1782,29 @@ void Vector::DebugShuffleNestedVector(Vector &vector, idx_t count) { void FlatVector::SetNull(Vector &vector, idx_t idx, bool is_null) { D_ASSERT(vector.GetVectorType() == VectorType::FLAT_VECTOR); vector.validity.Set(idx, !is_null); - if (is_null) { - auto &type = vector.GetType(); - auto internal_type = type.InternalType(); - if (internal_type == PhysicalType::STRUCT) { - // set all child entries to null as well - auto &entries = StructVector::GetEntries(vector); - for (auto &entry : entries) { - FlatVector::SetNull(*entry, idx, is_null); - } - } else if (internal_type == PhysicalType::ARRAY) { - // set the child element in the array to null as well - auto &child = ArrayVector::GetEntry(vector); - auto array_size = ArrayType::GetSize(type); - auto child_offset = idx * array_size; - for (idx_t i = 0; i < array_size; i++) { - FlatVector::SetNull(child, child_offset + i, is_null); - } + if (!is_null) { + return; + } + + auto &type = vector.GetType(); + auto internal_type = type.InternalType(); + + // Set all child entries to NULL. + if (internal_type == PhysicalType::STRUCT) { + auto &entries = StructVector::GetEntries(vector); + for (auto &entry : entries) { + FlatVector::SetNull(*entry, idx, is_null); + } + return; + } + + // Set all child entries to NULL. + if (internal_type == PhysicalType::ARRAY) { + auto &child = ArrayVector::GetEntry(vector); + auto array_size = ArrayType::GetSize(type); + auto child_offset = idx * array_size; + for (idx_t i = 0; i < array_size; i++) { + FlatVector::SetNull(child, child_offset + i, is_null); } } } diff --git a/src/duckdb/src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp b/src/duckdb/src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp index 7e3df336a..967660a8e 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/scanner/string_value_scanner.cpp @@ -684,6 +684,19 @@ bool LineError::HandleErrors(StringValueResult &result) { result.state_machine.options, cur_error.current_line_size, lines_per_batch, borked_line, result.current_line_position.begin.GetGlobalPosition(result.requested_size, first_nl), result.path); break; + case INVALID_STATE: + if (result.current_line_position.begin == line_pos) { + csv_error = CSVError::InvalidState( + result.state_machine.options, col_idx, lines_per_batch, borked_line, + result.current_line_position.begin.GetGlobalPosition(result.requested_size, first_nl), + line_pos.GetGlobalPosition(result.requested_size, first_nl), result.path); + } else { + csv_error = CSVError::InvalidState( + result.state_machine.options, col_idx, lines_per_batch, borked_line, + result.current_line_position.begin.GetGlobalPosition(result.requested_size, first_nl), + line_pos.GetGlobalPosition(result.requested_size), result.path); + } + break; default: throw InvalidInputException("CSV Error not allowed when inserting row"); } @@ -878,7 +891,11 @@ bool StringValueResult::AddRow(StringValueResult &result, const idx_t buffer_pos } void StringValueResult::InvalidState(StringValueResult &result) { - result.current_errors.Insert(UNTERMINATED_QUOTES, result.cur_col_id, result.chunk_col_id, result.last_position); + if (result.quoted) { + result.current_errors.Insert(UNTERMINATED_QUOTES, result.cur_col_id, result.chunk_col_id, result.last_position); + } else { + result.current_errors.Insert(INVALID_STATE, result.cur_col_id, result.chunk_col_id, result.last_position); + } } bool StringValueResult::EmptyLine(StringValueResult &result, const idx_t buffer_pos) { @@ -1724,11 +1741,18 @@ void StringValueScanner::FinalizeChunkProcess() { // If we are not done we have two options. // 1) If a boundary is set. if (iterator.IsBoundarySet()) { - bool has_unterminated_quotes = false; - if (!result.current_errors.HasErrorType(UNTERMINATED_QUOTES)) { + bool found_error = false; + CSVErrorType type; + if (!result.current_errors.HasErrorType(UNTERMINATED_QUOTES) && + !result.current_errors.HasErrorType(INVALID_STATE)) { iterator.done = true; } else { - has_unterminated_quotes = true; + found_error = true; + if (result.current_errors.HasErrorType(UNTERMINATED_QUOTES)) { + type = UNTERMINATED_QUOTES; + } else { + type = INVALID_STATE; + } } // We read until the next line or until we have nothing else to read. // Move to next buffer @@ -1747,18 +1771,21 @@ void StringValueScanner::FinalizeChunkProcess() { } } else { if (result.current_errors.HasErrorType(UNTERMINATED_QUOTES)) { - has_unterminated_quotes = true; + found_error = true; + type = UNTERMINATED_QUOTES; + } else if (result.current_errors.HasErrorType(INVALID_STATE)) { + found_error = true; + type = INVALID_STATE; } if (result.current_errors.HandleErrors(result)) { result.number_of_rows++; } } - if (states.IsQuotedCurrent() && !has_unterminated_quotes && + if (states.IsQuotedCurrent() && !found_error && state_machine->dialect_options.state_machine_options.rfc_4180.GetValue()) { // If we finish the execution of a buffer, and we end in a quoted state, it means we have unterminated // quotes - result.current_errors.Insert(UNTERMINATED_QUOTES, result.cur_col_id, result.chunk_col_id, - result.last_position); + result.current_errors.Insert(type, result.cur_col_id, result.chunk_col_id, result.last_position); if (result.current_errors.HandleErrors(result)) { result.number_of_rows++; } diff --git a/src/duckdb/src/execution/operator/csv_scanner/state_machine/csv_state_machine_cache.cpp b/src/duckdb/src/execution/operator/csv_scanner/state_machine/csv_state_machine_cache.cpp index 99d863c74..29fda8863 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/state_machine/csv_state_machine_cache.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/state_machine/csv_state_machine_cache.cpp @@ -75,10 +75,10 @@ void CSVStateMachineCache::Insert(const CSVStateMachineOptions &state_machine_op transition_array[static_cast('\r')][state] = CSVState::CARRIAGE_RETURN; if (state == static_cast(CSVState::STANDARD_NEWLINE)) { transition_array[static_cast('\n')][state] = CSVState::STANDARD; + } else if (!state_machine_options.rfc_4180.GetValue()) { + transition_array[static_cast('\n')][state] = CSVState::RECORD_SEPARATOR; } else { - if (!state_machine_options.rfc_4180.GetValue()) { - transition_array[static_cast('\n')][state] = CSVState::RECORD_SEPARATOR; - } + transition_array[static_cast('\n')][state] = CSVState::INVALID; } } else { transition_array[static_cast('\r')][state] = CSVState::RECORD_SEPARATOR; diff --git a/src/duckdb/src/execution/operator/csv_scanner/table_function/global_csv_state.cpp b/src/duckdb/src/execution/operator/csv_scanner/table_function/global_csv_state.cpp index 3874e74df..f6d0d386e 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/table_function/global_csv_state.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/table_function/global_csv_state.cpp @@ -36,7 +36,7 @@ CSVGlobalState::CSVGlobalState(ClientContext &context_p, const shared_ptrstart_iterator; current_boundary.SetCurrentBoundaryToPosition(single_threaded); if (current_boundary.done && context.client_data->debug_set_max_line_length) { @@ -59,7 +59,7 @@ double CSVGlobalState::GetProgress(const ReadCSVData &bind_data_p) const { if (file_scans.front()->file_size == 0) { percentage = 1.0; } else { - // for compressed files, readed bytes may greater than files size. + // for compressed files, read bytes may greater than files size. for (auto &file : file_scans) { double file_progress; if (!file->buffer_manager) { @@ -73,7 +73,8 @@ double CSVGlobalState::GetProgress(const ReadCSVData &bind_data_p) const { file_progress = static_cast(file->bytes_read); } // This file is an uncompressed file, so we use the more price bytes_read from the scanner - percentage += (double(1) / double(total_files)) * std::min(1.0, file_progress / double(file->file_size)); + percentage += (static_cast(1) / static_cast(total_files)) * + std::min(1.0, file_progress / static_cast(file->file_size)); } } return percentage * 100; @@ -187,7 +188,7 @@ idx_t CSVGlobalState::MaxThreads() const { if (single_threaded || !file_scans.front()->on_disk_file) { return system_threads; } - idx_t total_threads = file_scans.front()->file_size / CSVIterator::BYTES_PER_THREAD + 1; + const idx_t total_threads = file_scans.front()->file_size / CSVIterator::BYTES_PER_THREAD + 1; if (total_threads < system_threads) { return total_threads; @@ -200,7 +201,7 @@ void CSVGlobalState::DecrementThread() { D_ASSERT(running_threads > 0); running_threads--; if (running_threads == 0) { - bool ignore_or_store_errors = + const bool ignore_or_store_errors = bind_data.options.ignore_errors.GetValue() || bind_data.options.store_rejects.GetValue(); if (!single_threaded && !ignore_or_store_errors) { // If we are running multithreaded and not ignoring errors, we must run the validator @@ -224,6 +225,7 @@ bool IsCSVErrorAcceptedReject(CSVErrorType type) { case CSVErrorType::MAXIMUM_LINE_SIZE: case CSVErrorType::UNTERMINATED_QUOTES: case CSVErrorType::INVALID_UNICODE: + case CSVErrorType::INVALID_STATE: return true; default: return false; @@ -244,6 +246,8 @@ string CSVErrorTypeToEnum(CSVErrorType type) { return "UNQUOTED VALUE"; case CSVErrorType::INVALID_UNICODE: return "INVALID UNICODE"; + case CSVErrorType::INVALID_STATE: + return "INVALID STATE"; default: throw InternalException("CSV Error is not valid to be stored in a Rejects Table"); } @@ -308,7 +312,7 @@ void FillScanErrorTable(InternalAppender &scan_appender, idx_t scan_idx, idx_t f scan_appender.EndRow(); } -void CSVGlobalState::FillRejectsTable() { +void CSVGlobalState::FillRejectsTable() const { auto &options = bind_data.options; if (options.store_rejects.GetValue()) { diff --git a/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp b/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp index cbb586441..5c01d1fe9 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/util/csv_error.cpp @@ -179,6 +179,20 @@ CSVError CSVError::LineSizeError(const CSVReaderOptions &options, idx_t actual_s how_to_fix_it.str(), current_path); } +CSVError CSVError::InvalidState(const CSVReaderOptions &options, idx_t current_column, LinesPerBoundary error_info, + string &csv_row, idx_t row_byte_position, optional_idx byte_position, + const string ¤t_path) { + std::ostringstream error; + error << "The CSV Parser state machine reached an invalid state.\nThis can happen when is not possible to parse " + "your CSV File with the given options, or the CSV File is not RFC 4180 compliant "; + + std::ostringstream how_to_fix_it; + how_to_fix_it << "Possible fixes:" << '\n'; + how_to_fix_it << "* Enable scanning files that are not RFC 4180 compliant (rfc_4180=false)." << '\n'; + + return CSVError(error.str(), INVALID_STATE, current_column, csv_row, error_info, row_byte_position, byte_position, + options, how_to_fix_it.str(), current_path); +} CSVError CSVError::HeaderSniffingError(const CSVReaderOptions &options, const vector &best_header_row, const idx_t column_count, const string &delimiter) { std::ostringstream error; @@ -228,6 +242,7 @@ CSVError CSVError::HeaderSniffingError(const CSVReaderOptions &options, const ve if (!options.null_padding) { error << "* Enable null padding (null_padding=true) to pad missing columns with NULL values" << '\n'; } + return CSVError(error.str(), SNIFFING, {}); } @@ -287,6 +302,10 @@ CSVError CSVError::SniffingError(const CSVReaderOptions &options, const string & error << "* Check you are using the correct file compression, otherwise set it (e.g., compression = \'zstd\')" << '\n'; + if (options.dialect_options.state_machine_options.rfc_4180.GetValue() != false || + !options.dialect_options.state_machine_options.rfc_4180.IsSetByUser()) { + error << "* Enable scanning files that are not RFC 4180 compliant (rfc_4180=false). " << '\n'; + } return CSVError(error.str(), SNIFFING, {}); } diff --git a/src/duckdb/src/execution/operator/csv_scanner/util/csv_reader_options.cpp b/src/duckdb/src/execution/operator/csv_scanner/util/csv_reader_options.cpp index 2b05b242b..1a2ee65da 100644 --- a/src/duckdb/src/execution/operator/csv_scanner/util/csv_reader_options.cpp +++ b/src/duckdb/src/execution/operator/csv_scanner/util/csv_reader_options.cpp @@ -8,11 +8,11 @@ namespace duckdb { -CSVReaderOptions::CSVReaderOptions(CSVOption single_byte_delimiter, +CSVReaderOptions::CSVReaderOptions(const CSVOption single_byte_delimiter, const CSVOption &multi_byte_delimiter) { if (multi_byte_delimiter.GetValue().empty()) { - char single_byte_value = single_byte_delimiter.GetValue(); - string value(1, single_byte_value); + const char single_byte_value = single_byte_delimiter.GetValue(); + const string value(1, single_byte_value); dialect_options.state_machine_options.delimiter = value; } else { dialect_options.state_machine_options.delimiter = multi_byte_delimiter; @@ -32,7 +32,9 @@ static bool ParseBoolean(const vector &set, const string &loption) { } static bool ParseBoolean(const Value &value, const string &loption) { - + if (value.IsNull()) { + throw BinderException("\"%s\" expects a non-null boolean value (e.g. TRUE or 1)", loption); + } if (value.type().id() == LogicalTypeId::LIST) { auto &children = ListValue::GetChildren(value); return ParseBoolean(children, loption); @@ -62,6 +64,9 @@ static string ParseString(const Value &value, const string &loption) { } static int64_t ParseInteger(const Value &value, const string &loption) { + if (value.IsNull()) { + throw BinderException("\"%s\" expects a non-null integer value", loption); + } if (value.type().id() == LogicalTypeId::LIST) { auto &children = ListValue::GetChildren(value); if (children.size() != 1) { @@ -224,7 +229,7 @@ void CSVReaderOptions::SetReadOption(const string &loption, const Value &value, if (loption == "auto_detect") { auto_detect = ParseBoolean(value, loption); } else if (loption == "sample_size") { - auto sample_size_option = ParseInteger(value, loption); + const auto sample_size_option = ParseInteger(value, loption); if (sample_size_option < 1 && sample_size_option != -1) { throw BinderException("Unsupported parameter for SAMPLE_SIZE: cannot be smaller than 1"); } @@ -543,6 +548,13 @@ void CSVReaderOptions::Verify() { } } +bool GetBooleanValue(const pair &option) { + if (option.second.IsNull()) { + throw BinderException("read_csv %s cannot be NULL", option.first); + } + return BooleanValue::Get(option.second); +} + void CSVReaderOptions::FromNamedParameters(const named_parameter_map_t &in, ClientContext &context) { map ordered_user_defined_parameters; for (auto &kv : in) { @@ -672,9 +684,9 @@ void CSVReaderOptions::FromNamedParameters(const named_parameter_map_t &in, Clie sql_type_list.push_back(std::move(def_type)); } } else if (loption == "all_varchar") { - all_varchar = BooleanValue::Get(kv.second); + all_varchar = GetBooleanValue(kv); } else if (loption == "normalize_names") { - normalize_names = BooleanValue::Get(kv.second); + normalize_names = GetBooleanValue(kv); } else { SetReadOption(loption, kv.second, name_list); } @@ -686,7 +698,6 @@ void CSVReaderOptions::FromNamedParameters(const named_parameter_map_t &in, Clie user_defined_parameters.erase(user_defined_parameters.size() - 2); } } - //! This function is used to remember options set by the sniffer, for use in ReadCSVRelation void CSVReaderOptions::ToNamedParameters(named_parameter_map_t &named_params) const { auto &delimiter = dialect_options.state_machine_options.delimiter; diff --git a/src/duckdb/src/execution/operator/persistent/csv_rejects_table.cpp b/src/duckdb/src/execution/operator/persistent/csv_rejects_table.cpp index 11e8c1b0e..0674181f2 100644 --- a/src/duckdb/src/execution/operator/persistent/csv_rejects_table.cpp +++ b/src/duckdb/src/execution/operator/persistent/csv_rejects_table.cpp @@ -63,14 +63,17 @@ void CSVRejectsTable::InitializeTable(ClientContext &context, const ReadCSVData // Create CSV_ERROR_TYPE ENUM string enum_name = "CSV_ERROR_TYPE"; - Vector order_errors(LogicalType::VARCHAR, 6); + constexpr uint8_t number_of_accepted_errors = 7; + Vector order_errors(LogicalType::VARCHAR, number_of_accepted_errors); order_errors.SetValue(0, "CAST"); order_errors.SetValue(1, "MISSING COLUMNS"); order_errors.SetValue(2, "TOO MANY COLUMNS"); order_errors.SetValue(3, "UNQUOTED VALUE"); order_errors.SetValue(4, "LINE SIZE OVER MAXIMUM"); order_errors.SetValue(5, "INVALID UNICODE"); - LogicalType enum_type = LogicalType::ENUM(enum_name, order_errors, 6); + order_errors.SetValue(6, "INVALID STATE"); + + LogicalType enum_type = LogicalType::ENUM(enum_name, order_errors, number_of_accepted_errors); auto type_info = make_uniq(enum_name, enum_type); type_info->temporary = true; type_info->on_conflict = OnCreateConflict::IGNORE_ON_CONFLICT; diff --git a/src/duckdb/src/execution/operator/persistent/physical_insert.cpp b/src/duckdb/src/execution/operator/persistent/physical_insert.cpp index 736dffc9d..dac3491fe 100644 --- a/src/duckdb/src/execution/operator/persistent/physical_insert.cpp +++ b/src/duckdb/src/execution/operator/persistent/physical_insert.cpp @@ -712,6 +712,7 @@ SinkCombineResultType PhysicalInsert::Combine(ExecutionContext &context, Operato } else { // we have written rows to disk optimistically - merge directly into the transaction-local storage lstate.writer->WriteLastRowGroup(*lstate.local_collection); + lstate.writer->FinalFlush(); gstate.table.GetStorage().LocalMerge(context.client, *lstate.local_collection); gstate.table.GetStorage().FinalizeOptimisticWriter(context.client, *lstate.writer); } diff --git a/src/duckdb/src/execution/physical_plan/plan_cte.cpp b/src/duckdb/src/execution/physical_plan/plan_cte.cpp index 190cb9319..9c6596279 100644 --- a/src/duckdb/src/execution/physical_plan/plan_cte.cpp +++ b/src/duckdb/src/execution/physical_plan/plan_cte.cpp @@ -24,7 +24,7 @@ unique_ptr PhysicalPlanGenerator::CreatePlan(LogicalMaterializ auto right = CreatePlan(*op.children[1]); unique_ptr cte; - cte = make_uniq(op.ctename, op.table_index, op.children[1]->types, std::move(left), std::move(right), + cte = make_uniq(op.ctename, op.table_index, right->types, std::move(left), std::move(right), op.estimated_cardinality); cte->working_table = working_table; cte->cte_scans = materialized_ctes[op.table_index]; diff --git a/src/duckdb/src/function/cast/struct_cast.cpp b/src/duckdb/src/function/cast/struct_cast.cpp index 94f494c1c..e5902b752 100644 --- a/src/duckdb/src/function/cast/struct_cast.cpp +++ b/src/duckdb/src/function/cast/struct_cast.cpp @@ -1,3 +1,4 @@ +#include "duckdb/common/exception/binder_exception.hpp" #include "duckdb/function/cast/default_casts.hpp" #include "duckdb/function/cast/cast_function_set.hpp" #include "duckdb/function/cast/bound_cast_data.hpp" @@ -7,51 +8,67 @@ namespace duckdb { unique_ptr StructBoundCastData::BindStructToStructCast(BindCastInput &input, const LogicalType &source, const LogicalType &target) { vector child_cast_info; - auto &source_child_types = StructType::GetChildTypes(source); - auto &result_child_types = StructType::GetChildTypes(target); + auto &source_children = StructType::GetChildTypes(source); + auto &target_children = StructType::GetChildTypes(target); auto target_is_unnamed = StructType::IsUnnamed(target); auto source_is_unnamed = StructType::IsUnnamed(source); - if (source_child_types.size() != result_child_types.size()) { + auto is_unnamed = target_is_unnamed || source_is_unnamed; + if (is_unnamed && source_children.size() != target_children.size()) { throw TypeMismatchException(input.query_location, source, target, "Cannot cast STRUCTs of different size"); } - bool named_struct_cast = !source_is_unnamed && !target_is_unnamed; - case_insensitive_map_t target_members; - if (named_struct_cast) { - for (idx_t i = 0; i < result_child_types.size(); i++) { - auto &target_name = result_child_types[i].first; - if (target_members.find(target_name) != target_members.end()) { - throw NotImplementedException("Error while casting - duplicate name \"%s\" in struct", target_name); + + case_insensitive_map_t target_children_map; + if (!is_unnamed) { + for (idx_t i = 0; i < target_children.size(); i++) { + auto &name = target_children[i].first; + if (target_children_map.find(name) != target_children_map.end()) { + throw NotImplementedException("Error while casting - duplicate name \"%s\" in struct", name); } - target_members[target_name] = i; + target_children_map[name] = i; } } - vector child_member_map; - child_member_map.reserve(source_child_types.size()); - for (idx_t source_idx = 0; source_idx < source_child_types.size(); source_idx++) { - auto &source_child = source_child_types[source_idx]; - idx_t target_idx; - if (named_struct_cast) { - // named struct cast - find corresponding member in target - auto entry = target_members.find(source_child.first); - if (entry == target_members.end()) { - throw TypeMismatchException(input.query_location, source, target, - "Cannot cast STRUCTs - element \"" + source_child.first + - "\" in source struct was not found in target struct"); + + vector source_indexes; + vector target_indexes; + vector target_null_indexes; + bool has_any_match = is_unnamed; + + for (idx_t i = 0; i < source_children.size(); i++) { + auto &source_child = source_children[i]; + auto target_idx = i; + + // Map to the correct index for names structs. + if (!is_unnamed) { + auto target_child = target_children_map.find(source_child.first); + if (target_child == target_children_map.end()) { + // Skip any children that have no target. + continue; } - target_idx = entry->second; - target_members.erase(entry); - } else { - // unnamed struct cast - positionally cast elements - target_idx = source_idx; + target_idx = target_child->second; + target_children_map.erase(target_child); + has_any_match = true; } - child_member_map.push_back(target_idx); - auto child_cast = input.GetCastFunction(source_child.second, result_child_types[target_idx].second); + + source_indexes.push_back(i); + target_indexes.push_back(target_idx); + auto child_cast = input.GetCastFunction(source_child.second, target_children[target_idx].second); child_cast_info.push_back(std::move(child_cast)); } - D_ASSERT(child_member_map.size() == source_child_types.size()); - return make_uniq(std::move(child_cast_info), target, std::move(child_member_map)); + + if (!has_any_match) { + throw BinderException("STRUCT to STRUCT cast must have at least one matching member"); + } + + // The remaining target children have no match in the source struct. + // Thus, they become NULL. + for (const auto &target_child : target_children_map) { + target_null_indexes.push_back(target_child.second); + } + + return make_uniq(std::move(child_cast_info), target, std::move(source_indexes), + std::move(target_indexes), std::move(target_null_indexes)); } unique_ptr StructBoundCastData::InitStructCastLocalState(CastLocalStateParameters ¶meters) { @@ -71,32 +88,46 @@ unique_ptr StructBoundCastData::InitStructCastLocalState(Cas static bool StructToStructCast(Vector &source, Vector &result, idx_t count, CastParameters ¶meters) { auto &cast_data = parameters.cast_data->Cast(); - auto &lstate = parameters.local_state->Cast(); - auto &source_child_types = StructType::GetChildTypes(source.GetType()); - auto &source_children = StructVector::GetEntries(source); - D_ASSERT(source_children.size() == StructType::GetChildTypes(result.GetType()).size()); + auto &l_state = parameters.local_state->Cast(); + + auto &source_vectors = StructVector::GetEntries(source); + auto &target_children = StructVector::GetEntries(result); - auto &result_children = StructVector::GetEntries(result); bool all_converted = true; - for (idx_t c_idx = 0; c_idx < source_child_types.size(); c_idx++) { - auto source_idx = c_idx; - auto target_idx = cast_data.child_member_map[source_idx]; - auto &source_child_vector = *source_children[source_idx]; - auto &result_child_vector = *result_children[target_idx]; - CastParameters child_parameters(parameters, cast_data.child_cast_info[c_idx].cast_data, - lstate.local_states[c_idx]); - if (!cast_data.child_cast_info[c_idx].function(source_child_vector, result_child_vector, count, - child_parameters)) { + for (idx_t i = 0; i < cast_data.source_indexes.size(); i++) { + auto source_idx = cast_data.source_indexes[i]; + auto target_idx = cast_data.target_indexes[i]; + + auto &source_vector = *source_vectors[source_idx]; + auto &target_vector = *target_children[target_idx]; + + auto &child_cast_info = cast_data.child_cast_info[i]; + CastParameters child_parameters(parameters, child_cast_info.cast_data, l_state.local_states[i]); + auto success = child_cast_info.function(source_vector, target_vector, count, child_parameters); + if (!success) { all_converted = false; } } + + if (!cast_data.target_null_indexes.empty()) { + for (idx_t i = 0; i < cast_data.target_null_indexes.size(); i++) { + auto target_idx = cast_data.target_null_indexes[i]; + auto &target_vector = *target_children[target_idx]; + + target_vector.SetVectorType(VectorType::CONSTANT_VECTOR); + ConstantVector::SetNull(target_vector, true); + } + } + if (source.GetVectorType() == VectorType::CONSTANT_VECTOR) { result.SetVectorType(VectorType::CONSTANT_VECTOR); ConstantVector::SetNull(result, ConstantVector::IsNull(source)); - } else { - source.Flatten(count); - FlatVector::Validity(result) = FlatVector::Validity(source); + return all_converted; } + + source.Flatten(count); + auto &result_validity = FlatVector::Validity(result); + result_validity = FlatVector::Validity(source); return all_converted; } diff --git a/src/duckdb/src/function/table/sniff_csv.cpp b/src/duckdb/src/function/table/sniff_csv.cpp index c278dd723..5c72aabb9 100644 --- a/src/duckdb/src/function/table/sniff_csv.cpp +++ b/src/duckdb/src/function/table/sniff_csv.cpp @@ -38,9 +38,15 @@ static unique_ptr CSVSniffInitGlobal(ClientContext &co static unique_ptr CSVSniffBind(ClientContext &context, TableFunctionBindInput &input, vector &return_types, vector &names) { auto result = make_uniq(); + if (input.inputs[0].IsNull()) { + throw BinderException("sniff_csv cannot take NULL as a file path parameter"); + } result->path = input.inputs[0].ToString(); auto it = input.named_parameters.find("auto_detect"); if (it != input.named_parameters.end()) { + if (it->second.IsNull()) { + throw BinderException("\"%s\" expects a non-null boolean value (e.g. TRUE or 1)", it->first); + } if (!it->second.GetValue()) { throw InvalidInputException("sniff_csv function does not accept auto_detect variable set to false"); } @@ -169,7 +175,7 @@ static void CSVSniffFunction(ClientContext &context, TableFunctionInput &data_p, // 6. Skip Rows output.SetValue(5, 0, Value::UINTEGER(NumericCast(sniffer_options.dialect_options.skip_rows.GetValue()))); // 7. Has Header - auto has_header = Value::BOOLEAN(sniffer_options.dialect_options.header.GetValue()).ToString(); + auto has_header = Value::BOOLEAN(sniffer_options.dialect_options.header.GetValue()); output.SetValue(6, 0, has_header); // 8. List> {'col1': 'INTEGER', 'col2': 'VARCHAR'} vector values; diff --git a/src/duckdb/src/function/table/version/pragma_version.cpp b/src/duckdb/src/function/table/version/pragma_version.cpp index 167f071af..77e744b6b 100644 --- a/src/duckdb/src/function/table/version/pragma_version.cpp +++ b/src/duckdb/src/function/table/version/pragma_version.cpp @@ -1,5 +1,5 @@ #ifndef DUCKDB_PATCH_VERSION -#define DUCKDB_PATCH_VERSION "4-dev3985" +#define DUCKDB_PATCH_VERSION "4-dev4106" #endif #ifndef DUCKDB_MINOR_VERSION #define DUCKDB_MINOR_VERSION 1 @@ -8,10 +8,10 @@ #define DUCKDB_MAJOR_VERSION 1 #endif #ifndef DUCKDB_VERSION -#define DUCKDB_VERSION "v1.1.4-dev3985" +#define DUCKDB_VERSION "v1.1.4-dev4106" #endif #ifndef DUCKDB_SOURCE_ID -#define DUCKDB_SOURCE_ID "adc6f607a7" +#define DUCKDB_SOURCE_ID "2edfde3071" #endif #include "duckdb/function/table/system_functions.hpp" #include "duckdb/main/database.hpp" diff --git a/src/duckdb/src/function/window/window_boundaries_state.cpp b/src/duckdb/src/function/window/window_boundaries_state.cpp index 234d0cadc..7727e2a63 100644 --- a/src/duckdb/src/function/window/window_boundaries_state.cpp +++ b/src/duckdb/src/function/window/window_boundaries_state.cpp @@ -301,51 +301,86 @@ WindowBoundsSet WindowBoundariesState::GetWindowBounds(const BoundWindowExpressi WindowBoundsSet result; switch (wexpr.GetExpressionType()) { case ExpressionType::WINDOW_ROW_NUMBER: - result.insert(PARTITION_BEGIN); - if (!wexpr.arg_orders.empty()) { - // Secondary orders need to know how wide the partition is + if (wexpr.arg_orders.empty()) { + result.insert(PARTITION_BEGIN); + } else { + // Secondary orders need to know where the frame is + result.insert(FRAME_BEGIN); + result.insert(FRAME_END); + } + break; + case ExpressionType::WINDOW_NTILE: + if (wexpr.arg_orders.empty()) { + result.insert(PARTITION_BEGIN); result.insert(PARTITION_END); + } else { + // Secondary orders need to know where the frame is + result.insert(FRAME_BEGIN); + result.insert(FRAME_END); } break; - case ExpressionType::WINDOW_RANK_DENSE: case ExpressionType::WINDOW_RANK: - result.insert(PARTITION_BEGIN); if (wexpr.arg_orders.empty()) { + result.insert(PARTITION_BEGIN); result.insert(PEER_BEGIN); } else { - // Secondary orders need to know how wide the partition is - result.insert(PARTITION_END); + // Secondary orders need to know where the frame is + result.insert(FRAME_BEGIN); + result.insert(FRAME_END); } break; - case ExpressionType::WINDOW_PERCENT_RANK: + case ExpressionType::WINDOW_RANK_DENSE: result.insert(PARTITION_BEGIN); - result.insert(PARTITION_END); + result.insert(PEER_BEGIN); + break; + case ExpressionType::WINDOW_PERCENT_RANK: if (wexpr.arg_orders.empty()) { - // Secondary orders need to know where the first peer is + result.insert(PARTITION_BEGIN); + result.insert(PARTITION_END); result.insert(PEER_BEGIN); + } else { + // Secondary orders need to know where the frame is + result.insert(FRAME_BEGIN); + result.insert(FRAME_END); } break; case ExpressionType::WINDOW_CUME_DIST: - result.insert(PARTITION_BEGIN); - result.insert(PARTITION_END); if (wexpr.arg_orders.empty()) { + result.insert(PARTITION_BEGIN); + result.insert(PARTITION_END); result.insert(PEER_END); + } else { + // Secondary orders need to know where the frame is + result.insert(FRAME_BEGIN); + result.insert(FRAME_END); } break; - case ExpressionType::WINDOW_NTILE: case ExpressionType::WINDOW_LEAD: case ExpressionType::WINDOW_LAG: - result.insert(PARTITION_BEGIN); - result.insert(PARTITION_END); + if (wexpr.arg_orders.empty()) { + result.insert(PARTITION_BEGIN); + result.insert(PARTITION_END); + } else { + // Secondary orders need to know where the frame is + result.insert(FRAME_BEGIN); + result.insert(FRAME_END); + } break; case ExpressionType::WINDOW_FIRST_VALUE: case ExpressionType::WINDOW_LAST_VALUE: case ExpressionType::WINDOW_NTH_VALUE: case ExpressionType::WINDOW_AGGREGATE: - result.insert(PARTITION_BEGIN); - result.insert(PARTITION_END); result.insert(FRAME_BEGIN); result.insert(FRAME_END); + break; + default: + throw InternalException("Window aggregate type %s", ExpressionTypeToString(wexpr.GetExpressionType())); + } + + // Internal dependencies + if (result.count(FRAME_BEGIN) || result.count(FRAME_END)) { + result.insert(PARTITION_BEGIN); + result.insert(PARTITION_END); // if we have EXCLUDE GROUP / TIES, we also need peer boundaries if (wexpr.exclude_clause != WindowExcludeMode::NO_OTHER) { @@ -389,12 +424,8 @@ WindowBoundsSet WindowBoundariesState::GetWindowBounds(const BoundWindowExpressi default: break; } - break; - default: - throw InternalException("Window aggregate type %s", ExpressionTypeToString(wexpr.GetExpressionType())); } - // Internal dependencies if (result.count(VALID_END)) { result.insert(PARTITION_END); if (HasFollowingRange(wexpr)) { diff --git a/src/duckdb/src/function/window/window_rank_function.cpp b/src/duckdb/src/function/window/window_rank_function.cpp index d1299430a..b71a4aeac 100644 --- a/src/duckdb/src/function/window/window_rank_function.cpp +++ b/src/duckdb/src/function/window/window_rank_function.cpp @@ -117,18 +117,19 @@ void WindowRankExecutor::EvaluateInternal(WindowExecutorGlobalState &gstate, Win DataChunk &eval_chunk, Vector &result, idx_t count, idx_t row_idx) const { auto &gpeer = gstate.Cast(); auto &lpeer = lstate.Cast(); - auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(lpeer.bounds.data[PARTITION_END]); auto rdata = FlatVector::GetData(result); if (gpeer.token_tree) { + auto frame_begin = FlatVector::GetData(lpeer.bounds.data[FRAME_BEGIN]); + auto frame_end = FlatVector::GetData(lpeer.bounds.data[FRAME_END]); for (idx_t i = 0; i < count; ++i, ++row_idx) { - rdata[i] = gpeer.token_tree->Rank(partition_begin[i], partition_end[i], row_idx); + rdata[i] = gpeer.token_tree->Rank(frame_begin[i], frame_end[i], row_idx); } return; } // Reset to "previous" row + auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); auto peer_begin = FlatVector::GetData(lpeer.bounds.data[PEER_BEGIN]); lpeer.rank = (peer_begin[0] - partition_begin[0]) + 1; lpeer.rank_equal = (row_idx - peer_begin[0]); @@ -215,14 +216,14 @@ void WindowPercentRankExecutor::EvaluateInternal(WindowExecutorGlobalState &gsta idx_t row_idx) const { auto &gpeer = gstate.Cast(); auto &lpeer = lstate.Cast(); - auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(lpeer.bounds.data[PARTITION_END]); auto rdata = FlatVector::GetData(result); if (gpeer.token_tree) { + auto frame_begin = FlatVector::GetData(lpeer.bounds.data[FRAME_BEGIN]); + auto frame_end = FlatVector::GetData(lpeer.bounds.data[FRAME_END]); for (idx_t i = 0; i < count; ++i, ++row_idx) { - auto denom = static_cast(NumericCast(partition_end[i] - partition_begin[i] - 1)); - const auto rank = gpeer.token_tree->Rank(partition_begin[i], partition_end[i], row_idx); + auto denom = static_cast(NumericCast(frame_end[i] - frame_begin[i] - 1)); + const auto rank = gpeer.token_tree->Rank(frame_begin[i], frame_end[i], row_idx); double percent_rank = denom > 0 ? ((double)rank - 1) / denom : 0; rdata[i] = percent_rank; } @@ -230,6 +231,8 @@ void WindowPercentRankExecutor::EvaluateInternal(WindowExecutorGlobalState &gsta } // Reset to "previous" row + auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(lpeer.bounds.data[PARTITION_END]); auto peer_begin = FlatVector::GetData(lpeer.bounds.data[PEER_BEGIN]); lpeer.rank = (peer_begin[0] - partition_begin[0]) + 1; lpeer.rank_equal = (row_idx - peer_begin[0]); @@ -254,20 +257,22 @@ void WindowCumeDistExecutor::EvaluateInternal(WindowExecutorGlobalState &gstate, DataChunk &eval_chunk, Vector &result, idx_t count, idx_t row_idx) const { auto &gpeer = gstate.Cast(); auto &lpeer = lstate.Cast(); - auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(lpeer.bounds.data[PARTITION_END]); auto rdata = FlatVector::GetData(result); if (gpeer.token_tree) { + auto frame_begin = FlatVector::GetData(lpeer.bounds.data[FRAME_BEGIN]); + auto frame_end = FlatVector::GetData(lpeer.bounds.data[FRAME_END]); for (idx_t i = 0; i < count; ++i, ++row_idx) { - const auto denom = static_cast(NumericCast(partition_end[i] - partition_begin[i])); - const auto peer_end = gpeer.token_tree->PeerEnd(partition_begin[i], partition_end[i], row_idx); - const auto num = static_cast(peer_end - partition_begin[i]); + const auto denom = static_cast(NumericCast(frame_end[i] - frame_begin[i])); + const auto peer_end = gpeer.token_tree->PeerEnd(frame_begin[i], frame_end[i], row_idx); + const auto num = static_cast(peer_end - frame_begin[i]); rdata[i] = denom > 0 ? (num / denom) : 0; } return; } + auto partition_begin = FlatVector::GetData(lpeer.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(lpeer.bounds.data[PARTITION_END]); auto peer_end = FlatVector::GetData(lpeer.bounds.data[PEER_END]); for (idx_t i = 0; i < count; ++i, ++row_idx) { const auto denom = static_cast(NumericCast(partition_end[i] - partition_begin[i])); diff --git a/src/duckdb/src/function/window/window_rownumber_function.cpp b/src/duckdb/src/function/window/window_rownumber_function.cpp index 71f27fa3d..5d766980f 100644 --- a/src/duckdb/src/function/window/window_rownumber_function.cpp +++ b/src/duckdb/src/function/window/window_rownumber_function.cpp @@ -97,18 +97,19 @@ void WindowRowNumberExecutor::EvaluateInternal(WindowExecutorGlobalState &gstate idx_t row_idx) const { auto &grstate = gstate.Cast(); auto &lrstate = lstate.Cast(); - auto partition_begin = FlatVector::GetData(lrstate.bounds.data[PARTITION_BEGIN]); auto rdata = FlatVector::GetData(result); if (grstate.token_tree) { - auto partition_end = FlatVector::GetData(lrstate.bounds.data[PARTITION_END]); + auto frame_begin = FlatVector::GetData(lrstate.bounds.data[FRAME_BEGIN]); + auto frame_end = FlatVector::GetData(lrstate.bounds.data[FRAME_END]); for (idx_t i = 0; i < count; ++i, ++row_idx) { // Row numbers are unique ranks - rdata[i] = grstate.token_tree->Rank(partition_begin[i], partition_end[i], row_idx); + rdata[i] = grstate.token_tree->Rank(frame_begin[i], frame_end[i], row_idx); } return; } + auto partition_begin = FlatVector::GetData(lrstate.bounds.data[PARTITION_BEGIN]); for (idx_t i = 0; i < count; ++i, ++row_idx) { rdata[i] = row_idx - partition_begin[i] + 1; } @@ -131,6 +132,11 @@ void WindowNtileExecutor::EvaluateInternal(WindowExecutorGlobalState &gstate, Wi auto &lrstate = lstate.Cast(); auto partition_begin = FlatVector::GetData(lrstate.bounds.data[PARTITION_BEGIN]); auto partition_end = FlatVector::GetData(lrstate.bounds.data[PARTITION_END]); + if (grstate.token_tree) { + // With secondary sorts, we restrict to the frame boundaries, but everything else should compute the same. + partition_begin = FlatVector::GetData(lrstate.bounds.data[FRAME_BEGIN]); + partition_end = FlatVector::GetData(lrstate.bounds.data[FRAME_END]); + } auto rdata = FlatVector::GetData(result); WindowInputExpression ntile_col(eval_chunk, ntile_idx); for (idx_t i = 0; i < count; ++i, ++row_idx) { diff --git a/src/duckdb/src/function/window/window_value_function.cpp b/src/duckdb/src/function/window/window_value_function.cpp index d6bd816f2..7639b3f16 100644 --- a/src/duckdb/src/function/window/window_value_function.cpp +++ b/src/duckdb/src/function/window/window_value_function.cpp @@ -273,22 +273,22 @@ WindowLeadLagExecutor::GetLocalState(const WindowExecutorGlobalState &gstate) co void WindowLeadLagExecutor::EvaluateInternal(WindowExecutorGlobalState &gstate, WindowExecutorLocalState &lstate, DataChunk &eval_chunk, Vector &result, idx_t count, idx_t row_idx) const { auto &glstate = gstate.Cast(); - auto &ignore_nulls = glstate.ignore_nulls; auto &llstate = lstate.Cast(); auto &cursor = *llstate.cursor; WindowInputExpression leadlag_offset(eval_chunk, offset_idx); WindowInputExpression leadlag_default(eval_chunk, default_idx); - auto partition_begin = FlatVector::GetData(llstate.bounds.data[PARTITION_BEGIN]); - auto partition_end = FlatVector::GetData(llstate.bounds.data[PARTITION_END]); if (glstate.row_tree) { + auto frame_begin = FlatVector::GetData(llstate.bounds.data[FRAME_BEGIN]); + auto frame_end = FlatVector::GetData(llstate.bounds.data[FRAME_END]); + // TODO: Handle subframes. auto &frames = llstate.frames; frames.resize(1); auto &frame = frames[0]; for (idx_t i = 0; i < count; ++i, ++row_idx) { // (1) compute the ROW_NUMBER of the own row - frame = FrameBounds(partition_begin[i], partition_end[i]); + frame = FrameBounds(frame_begin[i], frame_end[i]); const auto own_row = glstate.row_tree->Rank(frame.start, frame.end, row_idx) - 1; // (2) adjust the row number by adding or subtracting an offset auto val_idx = NumericCast(own_row); @@ -317,6 +317,10 @@ void WindowLeadLagExecutor::EvaluateInternal(WindowExecutorGlobalState &gstate, return; } + auto partition_begin = FlatVector::GetData(llstate.bounds.data[PARTITION_BEGIN]); + auto partition_end = FlatVector::GetData(llstate.bounds.data[PARTITION_END]); + + auto &ignore_nulls = glstate.ignore_nulls; bool can_shift = ignore_nulls->AllValid(); if (wexpr.offset_expr) { can_shift = can_shift && wexpr.offset_expr->IsFoldable(); diff --git a/src/duckdb/src/include/duckdb/common/types/vector.hpp b/src/duckdb/src/include/duckdb/common/types/vector.hpp index 067582f45..61e0b714e 100644 --- a/src/duckdb/src/include/duckdb/common/types/vector.hpp +++ b/src/duckdb/src/include/duckdb/common/types/vector.hpp @@ -109,9 +109,10 @@ class Vector { /*! Create a new vector If create_data is true, the vector will be an owning empty vector. - If zero_data is true, the allocated data will be zero-initialized. + If initialize_to_zero is true, the allocated data will be zero-initialized. */ - DUCKDB_API Vector(LogicalType type, bool create_data, bool zero_data, idx_t capacity = STANDARD_VECTOR_SIZE); + DUCKDB_API Vector(LogicalType type, bool create_data, bool initialize_to_zero, + idx_t capacity = STANDARD_VECTOR_SIZE); // implicit copying of Vectors is not allowed Vector(const Vector &) = delete; // but moving of vectors is allowed @@ -151,7 +152,7 @@ class Vector { //! Creates the data of this vector with the specified type. Any data that //! is currently in the vector is destroyed. - DUCKDB_API void Initialize(bool zero_data = false, idx_t capacity = STANDARD_VECTOR_SIZE); + DUCKDB_API void Initialize(bool initialize_to_zero = false, idx_t capacity = STANDARD_VECTOR_SIZE); //! Converts this Vector to a printable string representation DUCKDB_API string ToString(idx_t count) const; diff --git a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_error.hpp b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_error.hpp index f7acbcb9e..6dd8a22ec 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_error.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/csv_error.hpp @@ -45,7 +45,8 @@ enum CSVErrorType : uint8_t { SNIFFING = 5, //! If something went wrong during sniffing and was not possible to find suitable candidates MAXIMUM_LINE_SIZE = 6, //! Maximum line size was exceeded by a line in the CSV File NULLPADDED_QUOTED_NEW_VALUE = 7, //! If the null_padding option is set, and we have quoted new values in parallel - INVALID_UNICODE = 8 //! If we have invalid unicode values + INVALID_UNICODE = 8, //! If we have invalid unicode values + INVALID_STATE = 9 //! If our CSV Scanner ended up in an invalid state }; class CSVError { @@ -64,6 +65,10 @@ class CSVError { //! Produces error for when the line size exceeds the maximum line size option static CSVError LineSizeError(const CSVReaderOptions &options, idx_t actual_size, LinesPerBoundary error_info, string &csv_row, idx_t byte_position, const string ¤t_path); + //! Produces error for when the state machine reaches an invalid state + static CSVError InvalidState(const CSVReaderOptions &options, idx_t current_column, LinesPerBoundary error_info, + string &csv_row, idx_t row_byte_position, optional_idx byte_position, + const string ¤t_path); //! Produces an error message for a dialect sniffing error. static CSVError SniffingError(const CSVReaderOptions &options, const string &search_space); //! Produces an error message for a header sniffing error. @@ -73,7 +78,7 @@ class CSVError { static CSVError UnterminatedQuotesError(const CSVReaderOptions &options, idx_t current_column, LinesPerBoundary error_info, string &csv_row, idx_t row_byte_position, optional_idx byte_position, const string ¤t_path); - //! Produces error messages for null_padding option is set and we have quoted new values in parallel + //! Produces error messages for null_padding option is set, and we have quoted new values in parallel static CSVError NullPaddingFail(const CSVReaderOptions &options, LinesPerBoundary error_info, const string ¤t_path); //! Produces error for incorrect (e.g., smaller and lower than the predefined) number of columns in a CSV Line diff --git a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/global_csv_state.hpp b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/global_csv_state.hpp index 2f63d9453..5381071ab 100644 --- a/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/global_csv_state.hpp +++ b/src/duckdb/src/include/duckdb/execution/operator/csv_scanner/global_csv_state.hpp @@ -32,7 +32,7 @@ struct CSVGlobalState : public GlobalTableFunctionState { //! In case it returns a nullptr it means we are done reading these files. unique_ptr Next(optional_ptr previous_scanner); - void FillRejectsTable(); + void FillRejectsTable() const; void DecrementThread(); diff --git a/src/duckdb/src/include/duckdb/function/cast/bound_cast_data.hpp b/src/duckdb/src/include/duckdb/function/cast/bound_cast_data.hpp index 50b4c70ab..fe950d0d1 100644 --- a/src/duckdb/src/include/duckdb/function/cast/bound_cast_data.hpp +++ b/src/duckdb/src/include/duckdb/function/cast/bound_cast_data.hpp @@ -48,21 +48,27 @@ struct ListCast { }; struct StructBoundCastData : public BoundCastData { - StructBoundCastData(vector child_casts, LogicalType target_p, vector child_member_map_p) + StructBoundCastData(vector child_casts, LogicalType target_p, vector source_indexes_p, + vector target_indexes_p, vector target_null_indexes_p) : child_cast_info(std::move(child_casts)), target(std::move(target_p)), - child_member_map(std::move(child_member_map_p)) { - D_ASSERT(child_cast_info.size() == child_member_map.size()); + source_indexes(std::move(source_indexes_p)), target_indexes(std::move(target_indexes_p)), + target_null_indexes(std::move(target_null_indexes_p)) { + D_ASSERT(child_cast_info.size() == source_indexes.size()); + D_ASSERT(source_indexes.size() == target_indexes.size()); } StructBoundCastData(vector child_casts, LogicalType target_p) : child_cast_info(std::move(child_casts)), target(std::move(target_p)) { for (idx_t i = 0; i < child_cast_info.size(); i++) { - child_member_map.push_back(i); + source_indexes.push_back(i); + target_indexes.push_back(i); } } vector child_cast_info; LogicalType target; - vector child_member_map; + vector source_indexes; + vector target_indexes; + vector target_null_indexes; static unique_ptr BindStructToStructCast(BindCastInput &input, const LogicalType &source, const LogicalType &target); @@ -74,7 +80,8 @@ struct StructBoundCastData : public BoundCastData { for (auto &info : child_cast_info) { copy_info.push_back(info.Copy()); } - return make_uniq(std::move(copy_info), target, child_member_map); + return make_uniq(std::move(copy_info), target, source_indexes, target_indexes, + target_null_indexes); } }; diff --git a/src/duckdb/src/include/duckdb/function/compression_function.hpp b/src/duckdb/src/include/duckdb/function/compression_function.hpp index 9199f45ac..d11beaa72 100644 --- a/src/duckdb/src/include/duckdb/function/compression_function.hpp +++ b/src/duckdb/src/include/duckdb/function/compression_function.hpp @@ -19,7 +19,7 @@ namespace duckdb { class DatabaseInstance; class ColumnData; -class ColumnDataCheckpointer; +struct ColumnDataCheckpointData; class ColumnSegment; class SegmentStatistics; class TableFilter; @@ -152,7 +152,7 @@ typedef idx_t (*compression_final_analyze_t)(AnalyzeState &state); //===--------------------------------------------------------------------===// // Compress //===--------------------------------------------------------------------===// -typedef unique_ptr (*compression_init_compression_t)(ColumnDataCheckpointer &checkpointer, +typedef unique_ptr (*compression_init_compression_t)(ColumnDataCheckpointData &checkpoint_data, unique_ptr state); typedef void (*compression_compress_data_t)(CompressionState &state, Vector &scan_vector, idx_t count); typedef void (*compression_compress_finalize_t)(CompressionState &state); diff --git a/src/duckdb/src/include/duckdb/storage/compression/alp/alp_compress.hpp b/src/duckdb/src/include/duckdb/storage/compression/alp/alp_compress.hpp index 2fad85771..5ba58dcf6 100644 --- a/src/duckdb/src/include/duckdb/storage/compression/alp/alp_compress.hpp +++ b/src/duckdb/src/include/duckdb/storage/compression/alp/alp_compress.hpp @@ -32,16 +32,16 @@ struct AlpCompressionState : public CompressionState { public: using EXACT_TYPE = typename FloatingToExact::TYPE; - AlpCompressionState(ColumnDataCheckpointer &checkpointer, AlpAnalyzeState *analyze_state) - : CompressionState(analyze_state->info), checkpointer(checkpointer), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_ALP)) { - CreateEmptySegment(checkpointer.GetRowGroup().start); + AlpCompressionState(ColumnDataCheckpointData &checkpoint_data, AlpAnalyzeState *analyze_state) + : CompressionState(analyze_state->info), checkpoint_data(checkpoint_data), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_ALP)) { + CreateEmptySegment(checkpoint_data.GetRowGroup().start); //! Combinations found on the analyze step are needed for compression state.best_k_combinations = analyze_state->state.best_k_combinations; } - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; unique_ptr current_segment; BufferHandle handle; @@ -90,8 +90,8 @@ struct AlpCompressionState : public CompressionState { } void CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -176,7 +176,7 @@ struct AlpCompressionState : public CompressionState { } void FlushSegment() { - auto &checkpoint_state = checkpointer.GetCheckpointState(); + auto &checkpoint_state = checkpoint_data.GetCheckpointState(); auto dataptr = handle.Ptr(); idx_t metadata_offset = AlignValue(UsedSpace()); @@ -262,8 +262,9 @@ struct AlpCompressionState : public CompressionState { }; template -unique_ptr AlpInitCompression(ColumnDataCheckpointer &checkpointer, unique_ptr state) { - return make_uniq>(checkpointer, (AlpAnalyzeState *)state.get()); +unique_ptr AlpInitCompression(ColumnDataCheckpointData &checkpoint_data, + unique_ptr state) { + return make_uniq>(checkpoint_data, (AlpAnalyzeState *)state.get()); } template diff --git a/src/duckdb/src/include/duckdb/storage/compression/alprd/alprd_compress.hpp b/src/duckdb/src/include/duckdb/storage/compression/alprd/alprd_compress.hpp index e5e8043d7..6b732e664 100644 --- a/src/duckdb/src/include/duckdb/storage/compression/alprd/alprd_compress.hpp +++ b/src/duckdb/src/include/duckdb/storage/compression/alprd/alprd_compress.hpp @@ -34,9 +34,9 @@ struct AlpRDCompressionState : public CompressionState { public: using EXACT_TYPE = typename FloatingToExact::TYPE; - AlpRDCompressionState(ColumnDataCheckpointer &checkpointer, AlpRDAnalyzeState *analyze_state) - : CompressionState(analyze_state->info), checkpointer(checkpointer), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_ALPRD)) { + AlpRDCompressionState(ColumnDataCheckpointData &checkpoint_data, AlpRDAnalyzeState *analyze_state) + : CompressionState(analyze_state->info), checkpoint_data(checkpoint_data), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_ALPRD)) { //! State variables from the analyze step that are needed for compression state.left_parts_dict_map = std::move(analyze_state->state.left_parts_dict_map); state.left_bit_width = analyze_state->state.left_bit_width; @@ -46,10 +46,10 @@ struct AlpRDCompressionState : public CompressionState { next_vector_byte_index_start = AlpRDConstants::HEADER_SIZE + actual_dictionary_size_bytes; memcpy((void *)state.left_parts_dict, (void *)analyze_state->state.left_parts_dict, actual_dictionary_size_bytes); - CreateEmptySegment(checkpointer.GetRowGroup().start); + CreateEmptySegment(checkpoint_data.GetRowGroup().start); } - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; unique_ptr current_segment; BufferHandle handle; @@ -100,8 +100,8 @@ struct AlpRDCompressionState : public CompressionState { } void CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -176,7 +176,7 @@ struct AlpRDCompressionState : public CompressionState { } void FlushSegment() { - auto &checkpoint_state = checkpointer.GetCheckpointState(); + auto &checkpoint_state = checkpoint_data.GetCheckpointState(); auto dataptr = handle.Ptr(); idx_t metadata_offset = AlignValue(UsedSpace()); @@ -277,9 +277,9 @@ struct AlpRDCompressionState : public CompressionState { }; template -unique_ptr AlpRDInitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr AlpRDInitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { - return make_uniq>(checkpointer, (AlpRDAnalyzeState *)state.get()); + return make_uniq>(checkpoint_data, (AlpRDAnalyzeState *)state.get()); } template diff --git a/src/duckdb/src/include/duckdb/storage/compression/chimp/chimp_compress.hpp b/src/duckdb/src/include/duckdb/storage/compression/chimp/chimp_compress.hpp index a233246f3..ae120027c 100644 --- a/src/duckdb/src/include/duckdb/storage/compression/chimp/chimp_compress.hpp +++ b/src/duckdb/src/include/duckdb/storage/compression/chimp/chimp_compress.hpp @@ -33,7 +33,7 @@ struct ChimpCompressionState : public CompressionState {}; // Compression Functions template -unique_ptr ChimpInitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr ChimpInitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { throw InternalException("Chimp has been deprecated, can no longer be used to compress data"); return nullptr; diff --git a/src/duckdb/src/include/duckdb/storage/compression/dictionary/compression.hpp b/src/duckdb/src/include/duckdb/storage/compression/dictionary/compression.hpp index 1ffdb494b..b0f29dc59 100644 --- a/src/duckdb/src/include/duckdb/storage/compression/dictionary/compression.hpp +++ b/src/duckdb/src/include/duckdb/storage/compression/dictionary/compression.hpp @@ -23,7 +23,7 @@ namespace duckdb { //===--------------------------------------------------------------------===// struct DictionaryCompressionCompressState : public DictionaryCompressionState { public: - DictionaryCompressionCompressState(ColumnDataCheckpointer &checkpointer_p, const CompressionInfo &info); + DictionaryCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info); public: void CreateEmptySegment(idx_t row_start); @@ -37,7 +37,7 @@ struct DictionaryCompressionCompressState : public DictionaryCompressionState { idx_t Finalize(); public: - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; // State regarding current segment diff --git a/src/duckdb/src/include/duckdb/storage/compression/patas/patas_compress.hpp b/src/duckdb/src/include/duckdb/storage/compression/patas/patas_compress.hpp index 2931e9168..b33e1a6a0 100644 --- a/src/duckdb/src/include/duckdb/storage/compression/patas/patas_compress.hpp +++ b/src/duckdb/src/include/duckdb/storage/compression/patas/patas_compress.hpp @@ -35,7 +35,7 @@ struct PatasCompressionState : public CompressionState {}; // Compression Functions template -unique_ptr PatasInitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr PatasInitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { throw InternalException("Patas has been deprecated, can no longer be used to compress data"); return nullptr; diff --git a/src/duckdb/src/include/duckdb/storage/compression/roaring/roaring.hpp b/src/duckdb/src/include/duckdb/storage/compression/roaring/roaring.hpp index 86bb985d4..7a2d1ddaa 100644 --- a/src/duckdb/src/include/duckdb/storage/compression/roaring/roaring.hpp +++ b/src/duckdb/src/include/duckdb/storage/compression/roaring/roaring.hpp @@ -319,7 +319,7 @@ struct ContainerCompressionState { struct RoaringCompressState : public CompressionState { public: - explicit RoaringCompressState(ColumnDataCheckpointer &checkpointer, unique_ptr analyze_state_p); + explicit RoaringCompressState(ColumnDataCheckpointData &checkpoint_data, unique_ptr analyze_state_p); public: //! RoaringStateAppender interface @@ -350,7 +350,7 @@ struct RoaringCompressState : public CompressionState { ContainerMetadataCollection metadata_collection; vector &container_metadata; - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; unique_ptr current_segment; BufferHandle handle; diff --git a/src/duckdb/src/include/duckdb/storage/segment/uncompressed.hpp b/src/duckdb/src/include/duckdb/storage/segment/uncompressed.hpp index 7452c8ed9..fb6dd6c0e 100644 --- a/src/duckdb/src/include/duckdb/storage/segment/uncompressed.hpp +++ b/src/duckdb/src/include/duckdb/storage/segment/uncompressed.hpp @@ -14,7 +14,7 @@ namespace duckdb { class DatabaseInstance; struct UncompressedFunctions { - static unique_ptr InitCompression(ColumnDataCheckpointer &checkpointer, + static unique_ptr InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state); static void Compress(CompressionState &state_p, Vector &data, idx_t count); static void FinalizeCompress(CompressionState &state_p); diff --git a/src/duckdb/src/include/duckdb/storage/table/column_data_checkpointer.hpp b/src/duckdb/src/include/duckdb/storage/table/column_data_checkpointer.hpp index 2bdc41dd8..a5bd436bb 100644 --- a/src/duckdb/src/include/duckdb/storage/table/column_data_checkpointer.hpp +++ b/src/duckdb/src/include/duckdb/storage/table/column_data_checkpointer.hpp @@ -15,38 +15,78 @@ namespace duckdb { struct TableScanOptions; -class ColumnDataCheckpointer { +//! Holds state related to a single column during compression +struct ColumnDataCheckpointData { public: - ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup &row_group_p, ColumnCheckpointState &state_p, - ColumnCheckpointInfo &checkpoint_info); + //! Default constructor used when column data does not need to be checkpointed + ColumnDataCheckpointData() { + } + ColumnDataCheckpointData(ColumnCheckpointState &checkpoint_state, ColumnData &col_data, DatabaseInstance &db, + RowGroup &row_group, ColumnCheckpointInfo &checkpoint_info) + : checkpoint_state(checkpoint_state), col_data(col_data), db(db), row_group(row_group), + checkpoint_info(checkpoint_info) { + } public: - DatabaseInstance &GetDatabase(); + CompressionFunction &GetCompressionFunction(CompressionType type); const LogicalType &GetType() const; ColumnData &GetColumnData(); RowGroup &GetRowGroup(); ColumnCheckpointState &GetCheckpointState(); + DatabaseInstance &GetDatabase(); - void Checkpoint(const column_segment_vector_t &nodes); - void FinalizeCheckpoint(column_segment_vector_t &&nodes); - CompressionFunction &GetCompressionFunction(CompressionType type); +private: + optional_ptr checkpoint_state; + optional_ptr col_data; + optional_ptr db; + optional_ptr row_group; + optional_ptr checkpoint_info; +}; + +struct CheckpointAnalyzeResult { +public: + //! Default constructor, returned when the column data doesn't require checkpoint + CheckpointAnalyzeResult() { + } + CheckpointAnalyzeResult(unique_ptr &&analyze_state, CompressionFunction &function) + : analyze_state(std::move(analyze_state)), function(function) { + } + +public: + unique_ptr analyze_state; + optional_ptr function; +}; + +class ColumnDataCheckpointer { +public: + ColumnDataCheckpointer(vector> &states, DatabaseInstance &db, RowGroup &row_group, + ColumnCheckpointInfo &checkpoint_info); + +public: + void Checkpoint(); + void FinalizeCheckpoint(); private: - void ScanSegments(const column_segment_vector_t &nodes, const std::function &callback); - unique_ptr DetectBestCompressionMethod(const column_segment_vector_t &nodes, idx_t &compression_idx); - void WriteToDisk(const column_segment_vector_t &nodes); - bool HasChanges(const column_segment_vector_t &nodes); - void WritePersistentSegments(column_segment_vector_t nodes); + void ScanSegments(const std::function &callback); + vector DetectBestCompressionMethod(); + void WriteToDisk(); + bool HasChanges(ColumnData &col_data); + void WritePersistentSegments(ColumnCheckpointState &state); + void InitAnalyze(); + void DropSegments(); private: - ColumnData &col_data; + vector> &checkpoint_states; + DatabaseInstance &db; RowGroup &row_group; - ColumnCheckpointState &state; - bool is_validity; - bool has_changes; Vector intermediate; - vector> compression_functions; ColumnCheckpointInfo &checkpoint_info; + + vector has_changes; + //! For every column data that is being checkpointed, the applicable functions + vector>> compression_functions; + //! For every column data that is being checkpointed, the analyze state of functions being tried + vector>> analyze_states; }; } // namespace duckdb diff --git a/src/duckdb/src/main/client_context.cpp b/src/duckdb/src/main/client_context.cpp index 94e413717..53234bdb9 100644 --- a/src/duckdb/src/main/client_context.cpp +++ b/src/duckdb/src/main/client_context.cpp @@ -236,7 +236,7 @@ ErrorData ClientContext::EndQueryInternal(ClientContextLock &lock, bool success, } } catch (std::exception &ex) { error = ErrorData(ex); - if (Exception::InvalidatesDatabase(error.Type())) { + if (Exception::InvalidatesDatabase(error.Type()) || error.Type() == ExceptionType::INTERNAL) { auto &db_inst = DatabaseInstance::GetDatabase(*this); ValidChecker::Invalidate(db_inst, error.RawMessage()); } @@ -584,7 +584,7 @@ PendingExecutionResult ClientContext::ExecuteTaskInternal(ClientContextLock &loc } } else if (!Exception::InvalidatesTransaction(error.Type())) { invalidate_transaction = false; - } else if (Exception::InvalidatesDatabase(error.Type())) { + } else if (Exception::InvalidatesDatabase(error.Type()) || error.Type() == ExceptionType::INTERNAL) { // fatal exceptions invalidate the entire database auto &db_instance = DatabaseInstance::GetDatabase(*this); ValidChecker::Invalidate(db_instance, error.RawMessage()); diff --git a/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp b/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp index efd21ed20..384760594 100644 --- a/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp +++ b/src/duckdb/src/optimizer/build_probe_side_optimizer.cpp @@ -54,8 +54,7 @@ static void FlipChildren(LogicalOperator &op) { std::swap(op.children[0], op.children[1]); switch (op.type) { case LogicalOperatorType::LOGICAL_COMPARISON_JOIN: - case LogicalOperatorType::LOGICAL_DELIM_JOIN: - case LogicalOperatorType::LOGICAL_ASOF_JOIN: { + case LogicalOperatorType::LOGICAL_DELIM_JOIN: { auto &join = op.Cast(); join.join_type = InverseJoinType(join.join_type); for (auto &cond : join.conditions) { @@ -242,8 +241,7 @@ void BuildProbeSideOptimizer::VisitOperator(LogicalOperator &op) { } break; } - case LogicalOperatorType::LOGICAL_ANY_JOIN: - case LogicalOperatorType::LOGICAL_ASOF_JOIN: { + case LogicalOperatorType::LOGICAL_ANY_JOIN: { auto &join = op.Cast(); // We do not yet support the RIGHT_SEMI or RIGHT_ANTI join types for these, so don't try to flip switch (join.join_type) { diff --git a/src/duckdb/src/optimizer/filter_combiner.cpp b/src/duckdb/src/optimizer/filter_combiner.cpp index 2b360c9c6..a72354d7d 100644 --- a/src/duckdb/src/optimizer/filter_combiner.cpp +++ b/src/duckdb/src/optimizer/filter_combiner.cpp @@ -705,8 +705,26 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vector(comparison_type, const_val->value); - conj_filter->child_filters.push_back(std::move(const_filter)); + if (const_val->value.IsNull()) { + switch (comparison_type) { + case ExpressionType::COMPARE_DISTINCT_FROM: { + auto null_filter = make_uniq(); + conj_filter->child_filters.push_back(std::move(null_filter)); + break; + } + case ExpressionType::COMPARE_NOT_DISTINCT_FROM: { + auto null_filter = make_uniq(); + conj_filter->child_filters.push_back(std::move(null_filter)); + break; + } + // if any other comparison type (i.e EQUAL, NOT_EQUAL) do not push a table filter + default: + break; + } + } else { + auto const_filter = make_uniq(comparison_type, const_val->value); + conj_filter->child_filters.push_back(std::move(const_filter)); + } } if (column_id.IsValid()) { optional_filter->child_filter = std::move(conj_filter); @@ -716,8 +734,6 @@ TableFilterSet FilterCombiner::GenerateTableScanFilters(const vectorschema); - auto &table = schema.GetEntry(transaction, CatalogType::TABLE_ENTRY, info.table)->Cast(); + auto catalog_table = schema.GetEntry(transaction, CatalogType::TABLE_ENTRY, info.table); + if (!catalog_table) { + // See internal issue 3663. + throw IOException("corrupt database file - index entry without table entry"); + } + auto &table = catalog_table->Cast(); // we also need to make sure the index type is loaded // backwards compatibility: @@ -462,6 +467,7 @@ void CheckpointReader::ReadIndex(CatalogTransaction transaction, Deserializer &d // now we can look for the index in the catalog and assign the table info auto &index = schema.CreateIndex(transaction, info, table)->Cast(); auto &data_table = table.GetStorage(); + auto &table_info = data_table.GetDataTableInfo(); IndexStorageInfo index_storage_info; if (root_block_pointer.IsValid()) { @@ -471,7 +477,7 @@ void CheckpointReader::ReadIndex(CatalogTransaction transaction, Deserializer &d } else { // Read the matching index storage info. - for (auto const &elem : data_table.GetDataTableInfo()->GetIndexStorageInfo()) { + for (auto const &elem : table_info->GetIndexStorageInfo()) { if (elem.name == index.name) { index_storage_info = elem; break; @@ -479,12 +485,13 @@ void CheckpointReader::ReadIndex(CatalogTransaction transaction, Deserializer &d } } - D_ASSERT(index_storage_info.IsValid() && !index_storage_info.name.empty()); + D_ASSERT(index_storage_info.IsValid()); + D_ASSERT(!index_storage_info.name.empty()); // Create an unbound index and add it to the table. auto unbound_index = make_uniq(std::move(create_info), index_storage_info, TableIOManager::Get(data_table), data_table.db); - data_table.GetDataTableInfo()->GetIndexes().AddIndex(std::move(unbound_index)); + table_info->GetIndexes().AddIndex(std::move(unbound_index)); } //===--------------------------------------------------------------------===// diff --git a/src/duckdb/src/storage/compression/bitpacking.cpp b/src/duckdb/src/storage/compression/bitpacking.cpp index aa7f24871..af2e35c2d 100644 --- a/src/duckdb/src/storage/compression/bitpacking.cpp +++ b/src/duckdb/src/storage/compression/bitpacking.cpp @@ -377,18 +377,18 @@ idx_t BitpackingFinalAnalyze(AnalyzeState &state) { template ::type> struct BitpackingCompressState : public CompressionState { public: - explicit BitpackingCompressState(ColumnDataCheckpointer &checkpointer, const CompressionInfo &info) - : CompressionState(info), checkpointer(checkpointer), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_BITPACKING)) { - CreateEmptySegment(checkpointer.GetRowGroup().start); + explicit BitpackingCompressState(ColumnDataCheckpointData &checkpoint_data, const CompressionInfo &info) + : CompressionState(info), checkpoint_data(checkpoint_data), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_BITPACKING)) { + CreateEmptySegment(checkpoint_data.GetRowGroup().start); state.data_ptr = reinterpret_cast(this); - auto &config = DBConfig::GetConfig(checkpointer.GetDatabase()); + auto &config = DBConfig::GetConfig(checkpoint_data.GetDatabase()); state.mode = config.options.force_bitpacking_mode; } - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; unique_ptr current_segment; BufferHandle handle; @@ -495,8 +495,8 @@ struct BitpackingCompressState : public CompressionState { } void CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -528,7 +528,7 @@ struct BitpackingCompressState : public CompressionState { } void FlushSegment() { - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); auto base_ptr = handle.Ptr(); // Compact the segment by moving the metadata next to the data. @@ -563,9 +563,9 @@ struct BitpackingCompressState : public CompressionState { }; template -unique_ptr BitpackingInitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr BitpackingInitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { - return make_uniq>(checkpointer, state->info); + return make_uniq>(checkpoint_data, state->info); } template diff --git a/src/duckdb/src/storage/compression/dictionary/compression.cpp b/src/duckdb/src/storage/compression/dictionary/compression.cpp index 0b9a47e98..064697fc7 100644 --- a/src/duckdb/src/storage/compression/dictionary/compression.cpp +++ b/src/duckdb/src/storage/compression/dictionary/compression.cpp @@ -3,17 +3,17 @@ namespace duckdb { -DictionaryCompressionCompressState::DictionaryCompressionCompressState(ColumnDataCheckpointer &checkpointer_p, +DictionaryCompressionCompressState::DictionaryCompressionCompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info) - : DictionaryCompressionState(info), checkpointer(checkpointer_p), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_DICTIONARY)), - heap(BufferAllocator::Get(checkpointer.GetDatabase())) { - CreateEmptySegment(checkpointer.GetRowGroup().start); + : DictionaryCompressionState(info), checkpoint_data(checkpoint_data_p), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_DICTIONARY)), + heap(BufferAllocator::Get(checkpoint_data.GetDatabase())) { + CreateEmptySegment(checkpoint_data.GetRowGroup().start); } void DictionaryCompressionCompressState::CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -31,7 +31,7 @@ void DictionaryCompressionCompressState::CreateEmptySegment(idx_t row_start) { next_width = 0; // Reset the pointers into the current segment. - auto &buffer_manager = BufferManager::GetBufferManager(checkpointer.GetDatabase()); + auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase()); current_handle = buffer_manager.Pin(current_segment->block); current_dictionary = DictionaryCompression::GetDictionary(*current_segment, current_handle); current_end_ptr = current_handle.Ptr() + current_dictionary.end; @@ -105,7 +105,7 @@ void DictionaryCompressionCompressState::Flush(bool final) { auto next_start = current_segment->start + current_segment->count; auto segment_size = Finalize(); - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); state.FlushSegment(std::move(current_segment), std::move(current_handle), segment_size); if (!final) { @@ -114,7 +114,7 @@ void DictionaryCompressionCompressState::Flush(bool final) { } idx_t DictionaryCompressionCompressState::Finalize() { - auto &buffer_manager = BufferManager::GetBufferManager(checkpointer.GetDatabase()); + auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase()); auto handle = buffer_manager.Pin(current_segment->block); D_ASSERT(current_dictionary.end == info.GetBlockSize()); diff --git a/src/duckdb/src/storage/compression/dictionary_compression.cpp b/src/duckdb/src/storage/compression/dictionary_compression.cpp index 1ea1bb326..6f6b4beed 100644 --- a/src/duckdb/src/storage/compression/dictionary_compression.cpp +++ b/src/duckdb/src/storage/compression/dictionary_compression.cpp @@ -52,7 +52,7 @@ struct DictionaryCompressionStorage { static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); static idx_t StringFinalAnalyze(AnalyzeState &state_p); - static unique_ptr InitCompression(ColumnDataCheckpointer &checkpointer, + static unique_ptr InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state); static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); static void FinalizeCompress(CompressionState &state_p); @@ -94,9 +94,9 @@ idx_t DictionaryCompressionStorage::StringFinalAnalyze(AnalyzeState &state_p) { //===--------------------------------------------------------------------===// // Compress //===--------------------------------------------------------------------===// -unique_ptr DictionaryCompressionStorage::InitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr DictionaryCompressionStorage::InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { - return make_uniq(checkpointer, state->info); + return make_uniq(checkpoint_data, state->info); } void DictionaryCompressionStorage::Compress(CompressionState &state_p, Vector &scan_vector, idx_t count) { diff --git a/src/duckdb/src/storage/compression/fixed_size_uncompressed.cpp b/src/duckdb/src/storage/compression/fixed_size_uncompressed.cpp index f694a13f8..3215f1b8f 100644 --- a/src/duckdb/src/storage/compression/fixed_size_uncompressed.cpp +++ b/src/duckdb/src/storage/compression/fixed_size_uncompressed.cpp @@ -43,7 +43,7 @@ idx_t FixedSizeFinalAnalyze(AnalyzeState &state_p) { //===--------------------------------------------------------------------===// struct UncompressedCompressState : public CompressionState { public: - UncompressedCompressState(ColumnDataCheckpointer &checkpointer, const CompressionInfo &info); + UncompressedCompressState(ColumnDataCheckpointData &checkpoint_data, const CompressionInfo &info); public: virtual void CreateEmptySegment(idx_t row_start); @@ -51,35 +51,36 @@ struct UncompressedCompressState : public CompressionState { void Finalize(idx_t segment_size); public: - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; unique_ptr current_segment; ColumnAppendState append_state; }; -UncompressedCompressState::UncompressedCompressState(ColumnDataCheckpointer &checkpointer, const CompressionInfo &info) - : CompressionState(info), checkpointer(checkpointer), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_UNCOMPRESSED)) { - UncompressedCompressState::CreateEmptySegment(checkpointer.GetRowGroup().start); +UncompressedCompressState::UncompressedCompressState(ColumnDataCheckpointData &checkpoint_data, + const CompressionInfo &info) + : CompressionState(info), checkpoint_data(checkpoint_data), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_UNCOMPRESSED)) { + UncompressedCompressState::CreateEmptySegment(checkpoint_data.GetRowGroup().start); } void UncompressedCompressState::CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); if (type.InternalType() == PhysicalType::VARCHAR) { auto &state = compressed_segment->GetSegmentState()->Cast(); state.overflow_writer = - make_uniq(checkpointer.GetCheckpointState().GetPartialBlockManager()); + make_uniq(checkpoint_data.GetCheckpointState().GetPartialBlockManager()); } current_segment = std::move(compressed_segment); current_segment->InitializeAppend(append_state); } void UncompressedCompressState::FlushSegment(idx_t segment_size) { - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); if (current_segment->type.InternalType() == PhysicalType::VARCHAR) { auto &segment_state = current_segment->GetSegmentState()->Cast(); segment_state.overflow_writer->Flush(); @@ -96,9 +97,9 @@ void UncompressedCompressState::Finalize(idx_t segment_size) { current_segment.reset(); } -unique_ptr UncompressedFunctions::InitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr UncompressedFunctions::InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { - return make_uniq(checkpointer, state->info); + return make_uniq(checkpoint_data, state->info); } void UncompressedFunctions::Compress(CompressionState &state_p, Vector &data, idx_t count) { diff --git a/src/duckdb/src/storage/compression/fsst.cpp b/src/duckdb/src/storage/compression/fsst.cpp index 6d0bb9b3d..bc5c52228 100644 --- a/src/duckdb/src/storage/compression/fsst.cpp +++ b/src/duckdb/src/storage/compression/fsst.cpp @@ -43,7 +43,7 @@ struct FSSTStorage { static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); static idx_t StringFinalAnalyze(AnalyzeState &state_p); - static unique_ptr InitCompression(ColumnDataCheckpointer &checkpointer, + static unique_ptr InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr analyze_state_p); static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); static void FinalizeCompress(CompressionState &state_p); @@ -208,10 +208,10 @@ idx_t FSSTStorage::StringFinalAnalyze(AnalyzeState &state_p) { class FSSTCompressionState : public CompressionState { public: - FSSTCompressionState(ColumnDataCheckpointer &checkpointer, const CompressionInfo &info) - : CompressionState(info), checkpointer(checkpointer), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_FSST)) { - CreateEmptySegment(checkpointer.GetRowGroup().start); + FSSTCompressionState(ColumnDataCheckpointData &checkpoint_data, const CompressionInfo &info) + : CompressionState(info), checkpoint_data(checkpoint_data), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_FSST)) { + CreateEmptySegment(checkpoint_data.GetRowGroup().start); } ~FSSTCompressionState() override { @@ -234,8 +234,8 @@ class FSSTCompressionState : public CompressionState { } void CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -318,7 +318,7 @@ class FSSTCompressionState : public CompressionState { auto next_start = current_segment->start + current_segment->count; auto segment_size = Finalize(); - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); state.FlushSegment(std::move(current_segment), std::move(current_handle), segment_size); if (!final) { @@ -382,7 +382,7 @@ class FSSTCompressionState : public CompressionState { return total_size; } - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; // State regarding current segment @@ -403,10 +403,10 @@ class FSSTCompressionState : public CompressionState { size_t fsst_serialized_symbol_table_size = sizeof(duckdb_fsst_decoder_t); }; -unique_ptr FSSTStorage::InitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr FSSTStorage::InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr analyze_state_p) { auto &analyze_state = analyze_state_p->Cast(); - auto compression_state = make_uniq(checkpointer, analyze_state.info); + auto compression_state = make_uniq(checkpoint_data, analyze_state.info); if (analyze_state.fsst_encoder == nullptr) { throw InternalException("No encoder found during FSST compression"); diff --git a/src/duckdb/src/storage/compression/rle.cpp b/src/duckdb/src/storage/compression/rle.cpp index d453f9b8d..0144ff829 100644 --- a/src/duckdb/src/storage/compression/rle.cpp +++ b/src/duckdb/src/storage/compression/rle.cpp @@ -137,18 +137,18 @@ struct RLECompressState : public CompressionState { return (info.GetBlockSize() - RLEConstants::RLE_HEADER_SIZE) / entry_size; } - RLECompressState(ColumnDataCheckpointer &checkpointer_p, const CompressionInfo &info) - : CompressionState(info), checkpointer(checkpointer_p), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_RLE)) { - CreateEmptySegment(checkpointer.GetRowGroup().start); + RLECompressState(ColumnDataCheckpointData &checkpoint_data_p, const CompressionInfo &info) + : CompressionState(info), checkpoint_data(checkpoint_data_p), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_RLE)) { + CreateEmptySegment(checkpoint_data.GetRowGroup().start); state.dataptr = (void *)this; max_rle_count = MaxRLECount(); } void CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto column_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -203,7 +203,7 @@ struct RLECompressState : public CompressionState { Store(minimal_rle_offset, data_ptr); handle.Destroy(); - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); state.FlushSegment(std::move(current_segment), std::move(handle), total_segment_size); } @@ -214,7 +214,7 @@ struct RLECompressState : public CompressionState { current_segment.reset(); } - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; CompressionFunction &function; unique_ptr current_segment; BufferHandle handle; @@ -225,8 +225,9 @@ struct RLECompressState : public CompressionState { }; template -unique_ptr RLEInitCompression(ColumnDataCheckpointer &checkpointer, unique_ptr state) { - return make_uniq>(checkpointer, state->info); +unique_ptr RLEInitCompression(ColumnDataCheckpointData &checkpoint_data, + unique_ptr state) { + return make_uniq>(checkpoint_data, state->info); } template diff --git a/src/duckdb/src/storage/compression/roaring/common.cpp b/src/duckdb/src/storage/compression/roaring/common.cpp index d680a4d02..e5dad7503 100644 --- a/src/duckdb/src/storage/compression/roaring/common.cpp +++ b/src/duckdb/src/storage/compression/roaring/common.cpp @@ -188,9 +188,9 @@ idx_t RoaringFinalAnalyze(AnalyzeState &state) { return LossyNumericCast((double)roaring_state.total_size * ROARING_COMPRESS_PENALTY); } -unique_ptr RoaringInitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr RoaringInitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr state) { - return make_uniq(checkpointer, std::move(state)); + return make_uniq(checkpoint_data, std::move(state)); } void RoaringCompress(CompressionState &state_p, Vector &scan_vector, idx_t count) { diff --git a/src/duckdb/src/storage/compression/roaring/compress.cpp b/src/duckdb/src/storage/compression/roaring/compress.cpp index 7de695b96..77463d9e2 100644 --- a/src/duckdb/src/storage/compression/roaring/compress.cpp +++ b/src/duckdb/src/storage/compression/roaring/compress.cpp @@ -193,13 +193,13 @@ void ContainerCompressionState::Reset() { //===--------------------------------------------------------------------===// // Compress //===--------------------------------------------------------------------===// -RoaringCompressState::RoaringCompressState(ColumnDataCheckpointer &checkpointer, +RoaringCompressState::RoaringCompressState(ColumnDataCheckpointData &checkpoint_data, unique_ptr analyze_state_p) : CompressionState(analyze_state_p->info), owned_analyze_state(std::move(analyze_state_p)), analyze_state(owned_analyze_state->Cast()), container_state(), - container_metadata(analyze_state.container_metadata), checkpointer(checkpointer), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_ROARING)) { - CreateEmptySegment(checkpointer.GetRowGroup().start); + container_metadata(analyze_state.container_metadata), checkpoint_data(checkpoint_data), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_ROARING)) { + CreateEmptySegment(checkpoint_data.GetRowGroup().start); total_count = 0; InitializeContainer(); } @@ -276,8 +276,8 @@ void RoaringCompressState::InitializeContainer() { } void RoaringCompressState::CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); @@ -291,7 +291,7 @@ void RoaringCompressState::CreateEmptySegment(idx_t row_start) { } void RoaringCompressState::FlushSegment() { - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); auto base_ptr = handle.Ptr(); // +======================================+ // |x|ddddddddddddddd||mmm| | diff --git a/src/duckdb/src/storage/compression/zstd.cpp b/src/duckdb/src/storage/compression/zstd.cpp index 945cd644f..6552f0dc8 100644 --- a/src/duckdb/src/storage/compression/zstd.cpp +++ b/src/duckdb/src/storage/compression/zstd.cpp @@ -76,7 +76,7 @@ struct ZSTDStorage { static bool StringAnalyze(AnalyzeState &state_p, Vector &input, idx_t count); static idx_t StringFinalAnalyze(AnalyzeState &state_p); - static unique_ptr InitCompression(ColumnDataCheckpointer &checkpointer, + static unique_ptr InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr analyze_state_p); static void Compress(CompressionState &state_p, Vector &scan_vector, idx_t count); static void FinalizeCompress(CompressionState &state_p); @@ -218,10 +218,12 @@ idx_t ZSTDStorage::StringFinalAnalyze(AnalyzeState &state_p) { class ZSTDCompressionState : public CompressionState { public: - explicit ZSTDCompressionState(ColumnDataCheckpointer &checkpointer, unique_ptr &&analyze_state_p) + explicit ZSTDCompressionState(ColumnDataCheckpointData &checkpoint_data, + unique_ptr &&analyze_state_p) : CompressionState(analyze_state_p->info), analyze_state(std::move(analyze_state_p)), - checkpointer(checkpointer), partial_block_manager(checkpointer.GetCheckpointState().GetPartialBlockManager()), - function(checkpointer.GetCompressionFunction(CompressionType::COMPRESSION_ZSTD)) { + checkpoint_data(checkpoint_data), + partial_block_manager(checkpoint_data.GetCheckpointState().GetPartialBlockManager()), + function(checkpoint_data.GetCompressionFunction(CompressionType::COMPRESSION_ZSTD)) { total_vector_count = GetVectorCount(analyze_state->count); total_segment_count = analyze_state->segment_count; @@ -303,7 +305,7 @@ class ZSTDCompressionState : public CompressionState { row_start = segment->start + segment->count; FlushSegment(); } else { - row_start = checkpointer.GetRowGroup().start; + row_start = checkpoint_data.GetRowGroup().start; } CreateEmptySegment(row_start); @@ -517,18 +519,18 @@ class ZSTDCompressionState : public CompressionState { } void CreateEmptySegment(idx_t row_start) { - auto &db = checkpointer.GetDatabase(); - auto &type = checkpointer.GetType(); + auto &db = checkpoint_data.GetDatabase(); + auto &type = checkpoint_data.GetType(); auto compressed_segment = ColumnSegment::CreateTransientSegment(db, function, type, row_start, info.GetBlockSize(), info.GetBlockSize()); segment = std::move(compressed_segment); - auto &buffer_manager = BufferManager::GetBufferManager(checkpointer.GetDatabase()); + auto &buffer_manager = BufferManager::GetBufferManager(checkpoint_data.GetDatabase()); segment_handle = buffer_manager.Pin(segment->block); } void FlushSegment() { - auto &state = checkpointer.GetCheckpointState(); + auto &state = checkpoint_data.GetCheckpointState(); idx_t segment_block_size; if (current_buffer.get() == &segment_handle) { @@ -555,7 +557,7 @@ class ZSTDCompressionState : public CompressionState { public: unique_ptr analyze_state; - ColumnDataCheckpointer &checkpointer; + ColumnDataCheckpointData &checkpoint_data; PartialBlockManager &partial_block_manager; CompressionFunction &function; @@ -611,9 +613,9 @@ class ZSTDCompressionState : public CompressionState { idx_t vector_size; }; -unique_ptr ZSTDStorage::InitCompression(ColumnDataCheckpointer &checkpointer, +unique_ptr ZSTDStorage::InitCompression(ColumnDataCheckpointData &checkpoint_data, unique_ptr analyze_state_p) { - return make_uniq(checkpointer, + return make_uniq(checkpoint_data, unique_ptr_cast(std::move(analyze_state_p))); } diff --git a/src/duckdb/src/storage/table/column_data.cpp b/src/duckdb/src/storage/table/column_data.cpp index 5b4b1cafa..53e7c5459 100644 --- a/src/duckdb/src/storage/table/column_data.cpp +++ b/src/duckdb/src/storage/table/column_data.cpp @@ -620,26 +620,16 @@ unique_ptr ColumnData::Checkpoint(RowGroup &row_group, Co auto checkpoint_state = CreateCheckpointState(row_group, checkpoint_info.info.manager); checkpoint_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique(); - auto l = data.Lock(); - auto &nodes = data.ReferenceSegments(l); + auto &nodes = data.ReferenceSegments(); if (nodes.empty()) { // empty table: flush the empty list return checkpoint_state; } - ColumnDataCheckpointer checkpointer(*this, row_group, *checkpoint_state, checkpoint_info); - checkpointer.Checkpoint(nodes); - checkpointer.FinalizeCheckpoint(data.MoveSegments(l)); - - // reset the compression function - compression.reset(); - // replace the old tree with the new one - auto new_segments = checkpoint_state->new_tree.MoveSegments(); - for (auto &new_segment : new_segments) { - AppendSegment(l, std::move(new_segment.node)); - } - ClearUpdates(); - + vector> states {*checkpoint_state}; + ColumnDataCheckpointer checkpointer(states, GetDatabase(), row_group, checkpoint_info); + checkpointer.Checkpoint(); + checkpointer.FinalizeCheckpoint(); return checkpoint_state; } diff --git a/src/duckdb/src/storage/table/column_data_checkpointer.cpp b/src/duckdb/src/storage/table/column_data_checkpointer.cpp index 2a7f0e957..13658077d 100644 --- a/src/duckdb/src/storage/table/column_data_checkpointer.cpp +++ b/src/duckdb/src/storage/table/column_data_checkpointer.cpp @@ -7,43 +7,74 @@ namespace duckdb { -ColumnDataCheckpointer::ColumnDataCheckpointer(ColumnData &col_data_p, RowGroup &row_group_p, - ColumnCheckpointState &state_p, ColumnCheckpointInfo &checkpoint_info_p) - : col_data(col_data_p), row_group(row_group_p), state(state_p), - is_validity(GetType().id() == LogicalTypeId::VALIDITY), - intermediate(is_validity ? LogicalType::BOOLEAN : GetType(), true, is_validity), - checkpoint_info(checkpoint_info_p) { - - auto &config = DBConfig::GetConfig(GetDatabase()); - auto functions = config.GetCompressionFunctions(GetType().InternalType()); - for (auto &func : functions) { - compression_functions.push_back(&func.get()); - } +//! ColumnDataCheckpointData + +CompressionFunction &ColumnDataCheckpointData::GetCompressionFunction(CompressionType compression_type) { + auto &db = col_data->GetDatabase(); + auto &column_type = col_data->type; + auto &config = DBConfig::GetConfig(db); + return *config.GetCompressionFunction(compression_type, column_type.InternalType()); +} + +DatabaseInstance &ColumnDataCheckpointData::GetDatabase() { + return col_data->GetDatabase(); } -DatabaseInstance &ColumnDataCheckpointer::GetDatabase() { - return col_data.GetDatabase(); +const LogicalType &ColumnDataCheckpointData::GetType() const { + return col_data->type; } -const LogicalType &ColumnDataCheckpointer::GetType() const { - return col_data.type; +ColumnData &ColumnDataCheckpointData::GetColumnData() { + return *col_data; } -ColumnData &ColumnDataCheckpointer::GetColumnData() { - return col_data; +RowGroup &ColumnDataCheckpointData::GetRowGroup() { + return *row_group; } -RowGroup &ColumnDataCheckpointer::GetRowGroup() { - return row_group; +ColumnCheckpointState &ColumnDataCheckpointData::GetCheckpointState() { + return *checkpoint_state; } -ColumnCheckpointState &ColumnDataCheckpointer::GetCheckpointState() { - return state; +//! ColumnDataCheckpointer + +static Vector CreateIntermediateVector(vector> &states) { + D_ASSERT(!states.empty()); + + auto &first_state = states[0]; + auto &col_data = first_state.get().column_data; + auto &type = col_data.type; + if (type.id() == LogicalTypeId::VALIDITY) { + return Vector(LogicalType::BOOLEAN, true, /* initialize_to_zero = */ true); + } + return Vector(type, true, false); +} + +ColumnDataCheckpointer::ColumnDataCheckpointer(vector> &checkpoint_states, + DatabaseInstance &db, RowGroup &row_group, + ColumnCheckpointInfo &checkpoint_info) + : checkpoint_states(checkpoint_states), db(db), row_group(row_group), + intermediate(CreateIntermediateVector(checkpoint_states)), checkpoint_info(checkpoint_info) { + + auto &config = DBConfig::GetConfig(db); + compression_functions.resize(checkpoint_states.size()); + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + auto &col_data = checkpoint_states[i].get().column_data; + auto to_add = config.GetCompressionFunctions(col_data.type.InternalType()); + auto &functions = compression_functions[i]; + for (auto &func : to_add) { + functions.push_back(&func.get()); + } + } } -void ColumnDataCheckpointer::ScanSegments(const column_segment_vector_t &nodes, - const std::function &callback) { +void ColumnDataCheckpointer::ScanSegments(const std::function &callback) { Vector scan_vector(intermediate.GetType(), nullptr); + auto &first_state = checkpoint_states[0]; + auto &col_data = first_state.get().column_data; + auto &nodes = col_data.data.ReferenceSegments(); + + // TODO: scan all the nodes from all segments, no need for CheckpointScan to virtualize this I think.. for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { auto &segment = *nodes[segment_idx].node; ColumnScanState scan_state; @@ -57,7 +88,6 @@ void ColumnDataCheckpointer::ScanSegments(const column_segment_vector_t &nodes, scan_state.row_index = segment.start + base_row_index; col_data.CheckpointScan(segment, scan_state, row_group.start, count, scan_vector); - callback(scan_vector, count); } } @@ -80,132 +110,219 @@ CompressionType ForceCompression(vector> &comp break; } } - if (found) { - // the force_compression method is available - // clear all other compression methods - // except the uncompressed method, so we can fall back on that - for (idx_t i = 0; i < compression_functions.size(); i++) { - auto &compression_function = *compression_functions[i]; - if (compression_function.type == CompressionType::COMPRESSION_UNCOMPRESSED) { + if (!found) { + return CompressionType::COMPRESSION_AUTO; + } + // the force_compression method is available + // clear all other compression methods + // except the uncompressed method, so we can fall back on that + for (idx_t i = 0; i < compression_functions.size(); i++) { + auto &compression_function = *compression_functions[i]; + if (compression_function.type == CompressionType::COMPRESSION_UNCOMPRESSED) { + continue; + } + if (compression_function.type != compression_type) { + compression_functions[i] = nullptr; + } + } + return compression_type; +} + +void ColumnDataCheckpointer::InitAnalyze() { + analyze_states.resize(checkpoint_states.size()); + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + if (!has_changes[i]) { + continue; + } + + auto &functions = compression_functions[i]; + auto &states = analyze_states[i]; + auto &checkpoint_state = checkpoint_states[i]; + auto &coldata = checkpoint_state.get().column_data; + states.resize(functions.size()); + for (idx_t j = 0; j < functions.size(); j++) { + auto &func = functions[j]; + if (!func) { continue; } - if (compression_function.type != compression_type) { - compression_functions[i] = nullptr; - } + states[j] = func->init_analyze(coldata, coldata.type.InternalType()); } } - return found ? compression_type : CompressionType::COMPRESSION_AUTO; } -unique_ptr ColumnDataCheckpointer::DetectBestCompressionMethod(const column_segment_vector_t &nodes, - idx_t &compression_idx) { +vector ColumnDataCheckpointer::DetectBestCompressionMethod() { D_ASSERT(!compression_functions.empty()); - auto &config = DBConfig::GetConfig(GetDatabase()); - CompressionType forced_method = CompressionType::COMPRESSION_AUTO; + auto &config = DBConfig::GetConfig(db); + vector forced_methods(checkpoint_states.size(), CompressionType::COMPRESSION_AUTO); auto compression_type = checkpoint_info.GetCompressionType(); - if (compression_type != CompressionType::COMPRESSION_AUTO) { - forced_method = ForceCompression(compression_functions, compression_type); - } - if (compression_type == CompressionType::COMPRESSION_AUTO && - config.options.force_compression != CompressionType::COMPRESSION_AUTO) { - forced_method = ForceCompression(compression_functions, config.options.force_compression); - } - // set up the analyze states for each compression method - vector> analyze_states; - analyze_states.reserve(compression_functions.size()); - for (idx_t i = 0; i < compression_functions.size(); i++) { - if (!compression_functions[i]) { - analyze_states.push_back(nullptr); - continue; + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + auto &functions = compression_functions[i]; + if (compression_type != CompressionType::COMPRESSION_AUTO) { + forced_methods[i] = ForceCompression(functions, compression_type); + } + if (compression_type == CompressionType::COMPRESSION_AUTO && + config.options.force_compression != CompressionType::COMPRESSION_AUTO) { + forced_methods[i] = ForceCompression(functions, config.options.force_compression); } - analyze_states.push_back(compression_functions[i]->init_analyze(col_data, col_data.type.InternalType())); } + InitAnalyze(); + // scan over all the segments and run the analyze step - ScanSegments(nodes, [&](Vector &scan_vector, idx_t count) { - for (idx_t i = 0; i < compression_functions.size(); i++) { - if (!compression_functions[i]) { + ScanSegments([&](Vector &scan_vector, idx_t count) { + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + if (!has_changes[i]) { continue; } - bool success = false; - if (analyze_states[i]) { - success = compression_functions[i]->analyze(*analyze_states[i], scan_vector, count); - } - if (!success) { - // could not use this compression function on this data set - // erase it - compression_functions[i] = nullptr; - analyze_states[i].reset(); + + auto &functions = compression_functions[i]; + auto &states = analyze_states[i]; + for (idx_t j = 0; j < functions.size(); j++) { + auto &state = states[j]; + auto &func = functions[j]; + + if (!state) { + continue; + } + if (!func->analyze(*state, scan_vector, count)) { + state = nullptr; + func = nullptr; + } } } }); - // now that we have passed over all the data, we need to figure out the best method - // we do this using the final_analyze method - unique_ptr state; - compression_idx = DConstants::INVALID_INDEX; - idx_t best_score = NumericLimits::Maximum(); - for (idx_t i = 0; i < compression_functions.size(); i++) { - if (!compression_functions[i]) { - continue; - } - if (!analyze_states[i]) { - continue; - } - //! Check if the method type is the forced method (if forced is used) - bool forced_method_found = compression_functions[i]->type == forced_method; - auto score = compression_functions[i]->final_analyze(*analyze_states[i]); + vector result; + result.resize(checkpoint_states.size()); - //! The finalize method can return this value from final_analyze to indicate it should not be used. - if (score == DConstants::INVALID_INDEX) { + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + if (!has_changes[i]) { continue; } + auto &functions = compression_functions[i]; + auto &states = analyze_states[i]; + auto &forced_method = forced_methods[i]; + + unique_ptr chosen_state; + idx_t best_score = NumericLimits::Maximum(); + idx_t compression_idx = DConstants::INVALID_INDEX; - if (score < best_score || forced_method_found) { - compression_idx = i; - best_score = score; - state = std::move(analyze_states[i]); + D_ASSERT(functions.size() == states.size()); + for (idx_t j = 0; j < functions.size(); j++) { + auto &function = functions[j]; + auto &state = states[j]; + + if (!state) { + continue; + } + + //! Check if the method type is the forced method (if forced is used) + bool forced_method_found = function->type == forced_method; + // now that we have passed over all the data, we need to figure out the best method + // we do this using the final_analyze method + auto score = function->final_analyze(*state); + + //! The finalize method can return this value from final_analyze to indicate it should not be used. + if (score == DConstants::INVALID_INDEX) { + continue; + } + + if (score < best_score || forced_method_found) { + compression_idx = j; + best_score = score; + chosen_state = std::move(state); + } + //! If we have found the forced method, we're done + if (forced_method_found) { + break; + } } - //! If we have found the forced method, we're done - if (forced_method_found) { - break; + + if (!chosen_state) { + auto &checkpoint_state = checkpoint_states[i]; + auto &col_data = checkpoint_state.get().column_data; + throw FatalException("No suitable compression/storage method found to store column of type %s", + col_data.type.ToString()); } + D_ASSERT(compression_idx != DConstants::INVALID_INDEX); + result[i] = CheckpointAnalyzeResult(std::move(chosen_state), *functions[compression_idx]); } - return state; + return result; } -void ColumnDataCheckpointer::WriteToDisk(const column_segment_vector_t &nodes) { - // there were changes or transient segments - // we need to rewrite the column segments to disk - +void ColumnDataCheckpointer::DropSegments() { // first we check the current segments // if there are any persistent segments, we will mark their old block ids as modified // since the segments will be rewritten their old on disk data is no longer required - for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { - auto segment = nodes[segment_idx].node.get(); - segment->CommitDropSegment(); + + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + if (!has_changes[i]) { + continue; + } + + auto &state = checkpoint_states[i]; + auto &col_data = state.get().column_data; + auto &nodes = col_data.data.ReferenceSegments(); + + // Drop the segments, as we'll be replacing them with new ones, because there are changes + for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { + auto segment = nodes[segment_idx].node.get(); + segment->CommitDropSegment(); + } } +} + +void ColumnDataCheckpointer::WriteToDisk() { + DropSegments(); + + // Analyze the candidate functions to select one of them to use for compression + auto analyze_result = DetectBestCompressionMethod(); + + // Initialize the compression for the selected function + D_ASSERT(analyze_result.size() == checkpoint_states.size()); + vector checkpoint_data(checkpoint_states.size()); + vector> compression_states(checkpoint_states.size()); + for (idx_t i = 0; i < analyze_result.size(); i++) { + if (!has_changes[i]) { + continue; + } + auto &analyze_state = analyze_result[i].analyze_state; + auto &function = analyze_result[i].function; - // now we need to write our segment - // we will first run an analyze step that determines which compression function to use - idx_t compression_idx; - auto analyze_state = DetectBestCompressionMethod(nodes, compression_idx); + auto &checkpoint_state = checkpoint_states[i]; + auto &col_data = checkpoint_state.get().column_data; - if (!analyze_state) { - throw FatalException("No suitable compression/storage method found to store column"); + checkpoint_data[i] = + ColumnDataCheckpointData(checkpoint_state, col_data, col_data.GetDatabase(), row_group, checkpoint_info); + compression_states[i] = function->init_compression(checkpoint_data[i], std::move(analyze_state)); } - // now that we have analyzed the compression functions we can start writing to disk - auto best_function = compression_functions[compression_idx]; - auto compress_state = best_function->init_compression(*this, std::move(analyze_state)); + // Scan over the existing segment + changes and compress the data + ScanSegments([&](Vector &scan_vector, idx_t count) { + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + if (!has_changes[i]) { + continue; + } + auto &function = analyze_result[i].function; + auto &compression_state = compression_states[i]; + function->compress(*compression_state, scan_vector, count); + } + }); - ScanSegments( - nodes, [&](Vector &scan_vector, idx_t count) { best_function->compress(*compress_state, scan_vector, count); }); - best_function->compress_finalize(*compress_state); + // Finalize the compression + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + if (!has_changes[i]) { + continue; + } + auto &function = analyze_result[i].function; + auto &compression_state = compression_states[i]; + function->compress_finalize(*compression_state); + } } -bool ColumnDataCheckpointer::HasChanges(const column_segment_vector_t &nodes) { +bool ColumnDataCheckpointer::HasChanges(ColumnData &col_data) { + auto &nodes = col_data.data.ReferenceSegments(); for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { auto segment = nodes[segment_idx].node.get(); if (segment->segment_type == ColumnSegmentType::TRANSIENT) { @@ -223,9 +340,13 @@ bool ColumnDataCheckpointer::HasChanges(const column_segment_vector_t &nodes) { return false; } -void ColumnDataCheckpointer::WritePersistentSegments(column_segment_vector_t nodes) { +void ColumnDataCheckpointer::WritePersistentSegments(ColumnCheckpointState &state) { // all segments are persistent and there are no updates // we only need to write the metadata + + auto &col_data = state.column_data; + auto nodes = col_data.data.MoveSegments(); + for (idx_t segment_idx = 0; segment_idx < nodes.size(); segment_idx++) { auto segment = nodes[segment_idx].node.get(); auto pointer = segment->GetDataPointer(); @@ -240,28 +361,51 @@ void ColumnDataCheckpointer::WritePersistentSegments(column_segment_vector_t nod } } -void ColumnDataCheckpointer::Checkpoint(const column_segment_vector_t &nodes) { - D_ASSERT(!nodes.empty()); - has_changes = HasChanges(nodes); - // first check if any of the segments have changes - if (has_changes) { - WriteToDisk(nodes); +void ColumnDataCheckpointer::Checkpoint() { + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + auto &state = checkpoint_states[i]; + auto &col_data = state.get().column_data; + has_changes.push_back(HasChanges(col_data)); } -} -void ColumnDataCheckpointer::FinalizeCheckpoint(column_segment_vector_t &&nodes) { - if (has_changes) { - nodes.clear(); - } else { - WritePersistentSegments(std::move(nodes)); + bool any_has_changes = false; + for (idx_t i = 0; i < has_changes.size(); i++) { + if (has_changes[i]) { + any_has_changes = true; + break; + } + } + if (!any_has_changes) { + // Nothing has undergone any changes, no need to checkpoint + // just move on to finalizing + return; } + + WriteToDisk(); } -CompressionFunction &ColumnDataCheckpointer::GetCompressionFunction(CompressionType compression_type) { - auto &db = GetDatabase(); - auto &column_type = GetType(); - auto &config = DBConfig::GetConfig(db); - return *config.GetCompressionFunction(compression_type, column_type.InternalType()); +void ColumnDataCheckpointer::FinalizeCheckpoint() { + for (idx_t i = 0; i < checkpoint_states.size(); i++) { + auto &state = checkpoint_states[i].get(); + auto &col_data = state.column_data; + if (has_changes[i]) { + // Move the existing segments out of the column data + // they will be destructed at the end of the scope + auto to_delete = col_data.data.MoveSegments(); + } else { + WritePersistentSegments(state); + } + + // reset the compression function + col_data.compression.reset(); + // replace the old tree with the new one + auto new_segments = state.new_tree.MoveSegments(); + auto l = col_data.data.Lock(); + for (auto &new_segment : new_segments) { + col_data.AppendSegment(l, std::move(new_segment.node)); + } + col_data.ClearUpdates(); + } } } // namespace duckdb diff --git a/src/duckdb/src/storage/table/standard_column_data.cpp b/src/duckdb/src/storage/table/standard_column_data.cpp index 506d04827..6b6311c3d 100644 --- a/src/duckdb/src/storage/table/standard_column_data.cpp +++ b/src/duckdb/src/storage/table/standard_column_data.cpp @@ -232,10 +232,29 @@ unique_ptr StandardColumnData::Checkpoint(RowGroup &row_g // to prevent reading the validity data immediately after it is checkpointed we first checkpoint the main column // this is necessary for concurrent checkpointing as due to the partial block manager checkpointed data might be // flushed to disk by a different thread than the one that wrote it, causing a data race - auto base_state = ColumnData::Checkpoint(row_group, checkpoint_info); - auto validity_state = validity.Checkpoint(row_group, checkpoint_info); + auto base_state = CreateCheckpointState(row_group, checkpoint_info.info.manager); + base_state->global_stats = BaseStatistics::CreateEmpty(type).ToUnique(); + auto validity_state_p = validity.CreateCheckpointState(row_group, checkpoint_info.info.manager); + validity_state_p->global_stats = BaseStatistics::CreateEmpty(validity.type).ToUnique(); + + auto &validity_state = *validity_state_p; auto &checkpoint_state = base_state->Cast(); - checkpoint_state.validity_state = std::move(validity_state); + checkpoint_state.validity_state = std::move(validity_state_p); + + auto &nodes = data.ReferenceSegments(); + if (nodes.empty()) { + // empty table: flush the empty list + return base_state; + } + + vector> checkpoint_states; + checkpoint_states.emplace_back(checkpoint_state); + checkpoint_states.emplace_back(validity_state); + + ColumnDataCheckpointer checkpointer(checkpoint_states, GetDatabase(), row_group, checkpoint_info); + checkpointer.Checkpoint(); + checkpointer.FinalizeCheckpoint(); + return base_state; }