Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Variant sparse merge #47317

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
25ca020
refactor variant flush
eldenmoon Nov 27, 2024
92125f3
add stats
eldenmoon Dec 10, 2024
aff912b
[opt](variant) add serialized sparse column (#45252)
csun5285 Dec 10, 2024
553db18
[fix] (variant) add serialize and deserialize (#45487)
csun5285 Dec 16, 2024
824ed73
refactor and implement sparse column reader and stats(#45492)
eldenmoon Dec 16, 2024
7434453
[fix] (variant) remove nullable type in sparse column and implement r…
csun5285 Dec 17, 2024
19e5a4b
fix 1 (#45517)
eldenmoon Dec 17, 2024
01ad180
fix 2 (#45538)
eldenmoon Dec 17, 2024
52e9e14
fix3 (#45554)
csun5285 Dec 17, 2024
989aa0f
fix 4 (#45601)
eldenmoon Dec 18, 2024
ba260c3
fix 5 (#45604)
csun5285 Dec 19, 2024
1c62ffe
fix 6 (#45633)
eldenmoon Dec 19, 2024
89b3d41
fix 7 (#45680)
eldenmoon Dec 19, 2024
c15122c
[fix](serialize) fix column serialize and deserialize (#45667)
csun5285 Dec 19, 2024
4472b88
fix serialize with root (#45737)
eldenmoon Dec 20, 2024
cfd2c1c
fix serialize (#45811)
eldenmoon Dec 23, 2024
c29e17d
fix 9 (#45822)
eldenmoon Dec 23, 2024
47b6deb
fix 10 (#45831)
eldenmoon Dec 25, 2024
5ec1358
random from 2, 8 (#45903)
eldenmoon Dec 25, 2024
f206e35
add debug point test (#45914)
eldenmoon Dec 26, 2024
5f9eae7
fix reader (#46042)
eldenmoon Dec 26, 2024
a0b86cc
fix reader should starts with prefix (#46060)
eldenmoon Dec 30, 2024
35d63b8
fix serialize (#46185)
eldenmoon Dec 31, 2024
a279532
fix some cases (#46202)
eldenmoon Dec 31, 2024
d89ac05
[test](variant) add ut (#45947)
csun5285 Jan 6, 2025
55bf035
merge master (#46486)
eldenmoon Jan 7, 2025
6fd759a
[opt] optimize insert_range_from
eldenmoon Jan 9, 2025
f7b27ae
Merge remote-tracking branch 'upstream-apache/master' into variant-sp…
eldenmoon Jan 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1021,9 +1021,8 @@ DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");

// Whether use schema dict in backend side instead of MetaService side(cloud mode)
DEFINE_mBool(variant_use_cloud_schema_dict, "true");
DEFINE_mDouble(variant_ratio_of_defaults_as_sparse_column, "1");
DEFINE_mInt64(variant_threshold_rows_to_estimate_sparse_column, "2048");
DEFINE_mBool(variant_throw_exeception_on_invalid_json, "false");
DEFINE_mInt32(variant_max_subcolumns_count, "5");

// block file cache
DEFINE_Bool(enable_file_cache, "false");
Expand Down
6 changes: 2 additions & 4 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1218,15 +1218,13 @@ DECLARE_mInt64(lookup_connection_cache_capacity);
// level of compression when using LZ4_HC, whose defalut value is LZ4HC_CLEVEL_DEFAULT
DECLARE_mInt64(LZ4_HC_compression_level);
// Threshold of a column as sparse column
// Notice: TEST ONLY
DECLARE_mDouble(variant_ratio_of_defaults_as_sparse_column);
DECLARE_mBool(variant_use_cloud_schema_dict);
// Threshold to estimate a column is sparsed
// Notice: TEST ONLY
DECLARE_mInt64(variant_threshold_rows_to_estimate_sparse_column);
// Treat invalid json format str as string, instead of throwing exception if false
DECLARE_mBool(variant_throw_exeception_on_invalid_json);

DECLARE_mInt32(variant_max_subcolumns_count);

DECLARE_mBool(enable_merge_on_write_correctness_check);
// USED FOR DEBUGING
// core directly if the compaction found there's duplicate key on mow table
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ Status Compaction::merge_input_rowsets() {
}

RowsetWriterContext ctx;
ctx.input_rs_readers = input_rs_readers;
RETURN_IF_ERROR(construct_output_rowset_writer(ctx));

// write merged rows to output rowset
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/rowset_writer_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ struct RowsetWriterContext {
// For remote rowset
std::optional<StorageResource> storage_resource;

// For collect segment statistics for compaction
std::vector<RowsetReaderSharedPtr> input_rs_readers;

bool is_local_rowset() const { return !storage_resource; }

std::string segment_path(int seg_id) const {
Expand Down
333 changes: 263 additions & 70 deletions be/src/olap/rowset/segment_v2/column_reader.cpp

Large diffs are not rendered by default.

116 changes: 78 additions & 38 deletions be/src/olap/rowset/segment_v2/column_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "olap/rowset/segment_v2/page_handle.h" // for PageHandle
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/parsed_page.h" // for ParsedPage
#include "olap/rowset/segment_v2/stream_reader.h"
#include "olap/types.h"
#include "olap/utils.h"
#include "util/once.h"
Expand Down Expand Up @@ -78,6 +79,7 @@ class InvertedIndexFileReader;
class PageDecoder;
class RowRanges;
class ZoneMapIndexReader;
struct VariantStatistics;

struct ColumnReaderOptions {
// whether verify checksum when read page
Expand Down Expand Up @@ -112,11 +114,16 @@ struct ColumnIteratorOptions {
// This will cache data shared by all reader
class ColumnReader : public MetadataAdder<ColumnReader> {
public:
ColumnReader() = default;
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
static Status create(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
static Status create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
Expand All @@ -129,11 +136,16 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
static Status create_agg_state(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
static Status create_variant(const ColumnReaderOptions& opts, const SegmentFooterPB& footer,
uint32_t column_id, uint64_t num_rows,
const io::FileReaderSPtr& file_reader,
std::unique_ptr<ColumnReader>* reader);
enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING };

virtual ~ColumnReader();
~ColumnReader() override;

// create a new column iterator. Client should delete returned iterator
virtual Status new_iterator(ColumnIterator** iterator, const TabletColumn& col);
Status new_iterator(ColumnIterator** iterator);
Status new_array_iterator(ColumnIterator** iterator);
Status new_struct_iterator(ColumnIterator** iterator);
Expand Down Expand Up @@ -206,7 +218,9 @@ class ColumnReader : public MetadataAdder<ColumnReader> {

void disable_index_meta_cache() { _use_index_page_cache = false; }

FieldType get_meta_type() { return _meta_type; }
virtual FieldType get_meta_type() { return _meta_type; }

int64_t get_metadata_size() const override;

private:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
Expand Down Expand Up @@ -251,8 +265,6 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges,
const ColumnIteratorOptions& iter_opts);

int64_t get_metadata_size() const override;

private:
int64_t _meta_length;
FieldType _meta_type;
Expand Down Expand Up @@ -291,6 +303,34 @@ class ColumnReader : public MetadataAdder<ColumnReader> {
DorisCallOnce<Status> _set_dict_encoding_type_once;
};

class VariantColumnReader : public ColumnReader {
public:
VariantColumnReader() = default;

Status init(const ColumnReaderOptions& opts, const SegmentFooterPB& footer, uint32_t column_id,
uint64_t num_rows, io::FileReaderSPtr file_reader);
Status new_iterator(ColumnIterator** iterator, const TabletColumn& col) override;

const SubcolumnColumnReaders::Node* get_reader_by_path(
const vectorized::PathInData& relative_path) const;

~VariantColumnReader() override = default;

FieldType get_meta_type() override { return FieldType::OLAP_FIELD_TYPE_VARIANT; }

const VariantStatistics* get_stats() const { return _statistics.get(); }

int64_t get_metadata_size() const override;

private:
Status _create_hierarchical_reader(ColumnIterator** reader, vectorized::PathInData path,
const SubcolumnColumnReaders::Node* node,
const SubcolumnColumnReaders::Node* root);
std::unique_ptr<SubcolumnColumnReaders> _subcolumn_readers;
std::unique_ptr<ColumnReader> _sparse_column_reader;
std::unique_ptr<VariantStatistics> _statistics;
};

// Base iterator to read one column data
class ColumnIterator {
public:
Expand Down Expand Up @@ -634,40 +674,40 @@ class RowIdColumnIterator : public ColumnIterator {
int32_t _segment_id = 0;
};

class VariantRootColumnIterator : public ColumnIterator {
public:
VariantRootColumnIterator() = delete;

explicit VariantRootColumnIterator(FileColumnIterator* iter) { _inner_iter.reset(iter); }

~VariantRootColumnIterator() override = default;

Status init(const ColumnIteratorOptions& opts) override { return _inner_iter->init(opts); }

Status seek_to_first() override { return _inner_iter->seek_to_first(); }

Status seek_to_ordinal(ordinal_t ord_idx) override {
return _inner_iter->seek_to_ordinal(ord_idx);
}

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
bool has_null;
return next_batch(n, dst, &has_null);
}

Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;

Status read_by_rowids(const rowid_t* rowids, const size_t count,
vectorized::MutableColumnPtr& dst) override;

ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); }

private:
Status _process_root_column(vectorized::MutableColumnPtr& dst,
vectorized::MutableColumnPtr& root_column,
const vectorized::DataTypePtr& most_common_type);
std::unique_ptr<FileColumnIterator> _inner_iter;
};
// class VariantRootColumnIterator : public ColumnIterator {
// public:
// VariantRootColumnIterator() = delete;
//
// explicit VariantRootColumnIterator(FileColumnIterator* iter) { _inner_iter.reset(iter); }
//
// ~VariantRootColumnIterator() override = default;
//
// Status init(const ColumnIteratorOptions& opts) override { return _inner_iter->init(opts); }
//
// Status seek_to_first() override { return _inner_iter->seek_to_first(); }
//
// Status seek_to_ordinal(ordinal_t ord_idx) override {
// return _inner_iter->seek_to_ordinal(ord_idx);
// }
//
// Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
// bool has_null;
// return next_batch(n, dst, &has_null);
// }
//
// Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst, bool* has_null) override;
//
// Status read_by_rowids(const rowid_t* rowids, const size_t count,
// vectorized::MutableColumnPtr& dst) override;
//
// ordinal_t get_current_ordinal() const override { return _inner_iter->get_current_ordinal(); }
//
// private:
// Status _process_root_column(vectorized::MutableColumnPtr& dst,
// vectorized::MutableColumnPtr& root_column,
// const vectorized::DataTypePtr& most_common_type);
// std::unique_ptr<FileColumnIterator> _inner_iter;
// };

// This iterator is used to read default value column
class DefaultValueColumnIterator : public ColumnIterator {
Expand Down
61 changes: 57 additions & 4 deletions be/src/olap/rowset/segment_v2/column_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "olap/rowset/segment_v2/page_builder.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/variant_column_writer_impl.h"
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/tablet_schema.h"
#include "olap/types.h"
Expand Down Expand Up @@ -292,6 +293,14 @@ Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
return Status::OK();
}

Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
*writer = std::unique_ptr<ColumnWriter>(new VariantColumnWriter(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column))));
return Status::OK();
}

//Todo(Amory): here should according nullable and offset and need sub to simply this function
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
Expand Down Expand Up @@ -320,10 +329,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_VARIANT: {
// Use ScalarColumnWriter to write it's only root data
std::unique_ptr<ColumnWriter> writer_local = std::unique_ptr<ColumnWriter>(
new ScalarColumnWriter(opts, std::move(field), file_writer));
*writer = std::move(writer_local);
RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
return Status::OK();
}
default:
Expand Down Expand Up @@ -1158,4 +1164,51 @@ Status MapColumnWriter::write_inverted_index() {
return Status::OK();
}

VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts,
const TabletColumn* column, std::unique_ptr<Field> field)
: ColumnWriter(std::move(field), opts.meta->is_nullable()) {
_impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
}

Status VariantColumnWriter::init() {
return _impl->init();
}

Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
_next_rowid += num_rows;
return _impl->append_data(ptr, num_rows);
}

uint64_t VariantColumnWriter::estimate_buffer_size() {
return _impl->estimate_buffer_size();
}

Status VariantColumnWriter::finish() {
return _impl->finish();
}
Status VariantColumnWriter::write_data() {
return _impl->write_data();
}
Status VariantColumnWriter::write_ordinal_index() {
return _impl->write_ordinal_index();
}

Status VariantColumnWriter::write_zone_map() {
return _impl->write_zone_map();
}

Status VariantColumnWriter::write_bitmap_index() {
return _impl->write_bitmap_index();
}
Status VariantColumnWriter::write_inverted_index() {
return _impl->write_inverted_index();
}
Status VariantColumnWriter::write_bloom_filter_index() {
return _impl->write_bloom_filter_index();
}
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
return _impl->append_nullable(null_map, ptr, num_rows);
}

} // namespace doris::segment_v2
Loading
Loading