Skip to content

Commit

Permalink
[Feature][Materialized-View] support materialized view on vectorized …
Browse files Browse the repository at this point in the history
…engine (#10792)
  • Loading branch information
BiteTheDDDDt authored Aug 4, 2022
1 parent 35f2632 commit ec3c911
Show file tree
Hide file tree
Showing 20 changed files with 100 additions and 153 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
*.thrift text eol=lf
*.proto text eol=lf
*.conf text eol=lf
*.out text eol=lf -diff
40 changes: 0 additions & 40 deletions be/src/olap/hll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,44 +407,4 @@ void HllSetResolver::parse() {
}
}

void HllSetHelper::set_sparse(char* result, const std::map<int, uint8_t>& index_to_value,
int& len) {
result[0] = HLL_DATA_SPARSE;
len = sizeof(HllSetResolver::SetTypeValueType) + sizeof(HllSetResolver::SparseLengthValueType);
char* write_value_pos = result + len;
for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) {
write_value_pos[0] = (char)(iter->first & 0xff);
write_value_pos[1] = (char)(iter->first >> 8 & 0xff);
write_value_pos[2] = iter->second;
write_value_pos += 3;
}
int registers_count = index_to_value.size();
len += registers_count *
(sizeof(HllSetResolver::SparseIndexType) + sizeof(HllSetResolver::SparseValueType));
*(int*)(result + 1) = registers_count;
}

void HllSetHelper::set_explicit(char* result, const std::set<uint64_t>& hash_value_set, int& len) {
result[0] = HLL_DATA_EXPLICIT;
result[1] = (HllSetResolver::ExplicitLengthValueType)(hash_value_set.size());
len = sizeof(HllSetResolver::SetTypeValueType) +
sizeof(HllSetResolver::ExplicitLengthValueType);
char* write_pos = result + len;
for (auto iter = hash_value_set.begin(); iter != hash_value_set.end(); ++iter) {
uint64_t hash_value = *iter;
*(uint64_t*)write_pos = hash_value;
write_pos += 8;
}
len += sizeof(uint64_t) * hash_value_set.size();
}

void HllSetHelper::set_full(char* result, const std::map<int, uint8_t>& index_to_value,
const int registers_len, int& len) {
result[0] = HLL_DATA_FULL;
for (auto iter = index_to_value.begin(); iter != index_to_value.end(); ++iter) {
result[1 + iter->first] = iter->second;
}
len = registers_len + sizeof(HllSetResolver::SetTypeValueType);
}

} // namespace doris
9 changes: 0 additions & 9 deletions be/src/olap/hll.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,13 +370,4 @@ class HllSetResolver {
SparseLengthValueType* _sparse_count;
};

// todo(kks): remove this when dpp_sink class was removed
class HllSetHelper {
public:
static void set_sparse(char* result, const std::map<int, uint8_t>& index_to_value, int& len);
static void set_explicit(char* result, const std::set<uint64_t>& hash_value_set, int& len);
static void set_full(char* result, const std::map<int, uint8_t>& index_to_value,
const int set_len, int& len);
};

} // namespace doris
10 changes: 4 additions & 6 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1001,16 +1001,15 @@ uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro

Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector,
uint16_t* sel_rowid_idx, size_t select_size,
vectorized::MutableColumns* mutable_columns) {
uint16_t* sel_rowid_idx, size_t select_size) {
SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
std::vector<rowid_t> rowids(select_size);
for (size_t i = 0; i < select_size; ++i) {
rowids[i] = rowid_vector[sel_rowid_idx[i]];
}
for (auto cid : read_column_ids) {
auto& column = (*mutable_columns)[cid];
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, column));
RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
_current_return_columns[cid]));
}

return Status::OK();
Expand Down Expand Up @@ -1117,8 +1116,7 @@ Status SegmentIterator::next_batch(vectorized::Block* block) {

// step3: read non_predicate column
RETURN_IF_ERROR(_read_columns_by_rowids(_non_predicate_columns, _block_rowids,
sel_rowid_idx, selected_size,
&_current_return_columns));
sel_rowid_idx, selected_size));

// step4: output columns
// 4.1 output non-predicate column
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SegmentIterator : public RowwiseIterator {
void _output_non_pred_columns(vectorized::Block* block);
Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
std::vector<rowid_t>& rowid_vector, uint16_t* sel_rowid_idx,
size_t select_size, vectorized::MutableColumns* mutable_columns);
size_t select_size);

template <class Container>
Status _output_column_by_sel_idx(vectorized::Block* block, const Container& column_ids,
Expand Down
48 changes: 22 additions & 26 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -785,22 +785,11 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
}

// material-view or rollup task will fail now
if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
return Status::NotSupported(
"_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
"not supported now. ");
}

std::vector<bool> nullable_tuples;
for (int i = 0; i < ref_block->columns(); i++) {
nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
}

ObjectPool pool;
RuntimeState* state = pool.add(new RuntimeState());
state->set_desc_tbl(&_desc_tbl);
RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
RowDescriptor row_desc =
RowDescriptor(_desc_tbl.get_tuple_descriptor(_desc_tbl.get_row_tuples()[0]), false);

const int row_size = ref_block->rows();
const int column_size = new_block->columns();
Expand All @@ -811,10 +800,6 @@ Status RowBlockChanger::change_block(vectorized::Block* ref_block,
for (int idx = 0; idx < column_size; idx++) {
int ref_idx = _schema_mapping[idx].ref_column;

if (!_schema_mapping[idx].materialized_function.empty()) {
return Status::NotSupported("Materialized function not supported now. ");
}

if (ref_idx < 0) {
// new column, write default value
auto value = _schema_mapping[idx].default_value;
Expand Down Expand Up @@ -1547,15 +1532,14 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea
rowset_reader->next_block(ref_block.get());
while (ref_block->rows()) {
RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
if (!_mem_tracker->check_limit(config::memory_limitation_per_thread_for_schema_change_bytes,
new_block->allocated_bytes())) {
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
RETURN_IF_ERROR(create_rowset());

if (!_mem_tracker->check_limit(
config::memory_limitation_per_thread_for_schema_change_bytes,
new_block->allocated_bytes())) {
if (!_mem_tracker->check_limit(_memory_limitation, new_block->allocated_bytes())) {
LOG(WARNING) << "Memory limitation is too small for Schema Change."
<< "memory_limitation=" << _memory_limitation;
<< " _memory_limitation=" << _memory_limitation
<< ", new_block->allocated_bytes()=" << new_block->allocated_bytes()
<< ", consumption=" << _mem_tracker->consumption();
return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
}
}
Expand Down Expand Up @@ -1649,9 +1633,8 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
rs_readers.push_back(rs_reader);
}
// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
auto max_version_rowset = src_rowsets.back();
const TabletSchema* cur_tablet_schema =
max_version_rowset->rowset_meta()->tablet_schema().get();
src_rowsets.back()->rowset_meta()->tablet_schema().get();
if (cur_tablet_schema == nullptr) {
cur_tablet_schema = new_tablet->tablet_schema().get();
}
Expand Down Expand Up @@ -1680,6 +1663,12 @@ Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_
rs_readers.push_back(rs_reader);
}

// get cur schema if rowset schema exist, rowset schema must be newer than tablet schema
auto cur_tablet_schema = src_rowsets.back()->rowset_meta()->tablet_schema();
if (cur_tablet_schema == nullptr) {
cur_tablet_schema = new_tablet->tablet_schema();
}

Merger::Statistics stats;
RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE,
new_tablet->tablet_schema().get(), rs_readers,
Expand Down Expand Up @@ -1717,6 +1706,7 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req

std::shared_mutex SchemaChangeHandler::_mutex;
std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
std::set<std::string> SchemaChangeHandler::_supported_functions = {"hll_hash", "to_bitmap"};

// In the past schema change and rollup will create new tablet and will wait for txns starting before the task to finished
// It will cost a lot of time to wait and the task is very difficult to understand.
Expand Down Expand Up @@ -1848,7 +1838,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
LOG(WARNING) << "New tablet has a version " << pair.first
<< " crossing base tablet's max_version="
<< max_rowset->end_version();
Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
return Status::OLAPInternalError(OLAP_ERR_VERSION_ALREADY_MERGED);
}
}
std::vector<RowsetSharedPtr> empty_vec;
Expand Down Expand Up @@ -1949,9 +1939,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
if (item.__isset.mv_expr) {
if (item.mv_expr.nodes[0].node_type == TExprNodeType::FUNCTION_CALL) {
mv_param.mv_expr = item.mv_expr.nodes[0].fn.name.function_name;
if (!_supported_functions.count(mv_param.mv_expr)) {
return Status::NotSupported("Unknow materialized view expr " +
mv_param.mv_expr);
}
} else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
mv_param.mv_expr = "count_field";
}

mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
}
sc_params.materialized_params_map.insert(
Expand Down Expand Up @@ -2152,6 +2147,7 @@ Status SchemaChangeHandler::_parse_request(
const TabletColumn& new_column = new_tablet->tablet_schema()->column(i);
const string& column_name = new_column.name();
ColumnMapping* column_mapping = rb_changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;

if (new_column.has_reference_column()) {
int32_t column_index = base_tablet_schema->field_index(new_column.referenced_column());
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ class SchemaChangeHandler {

static std::shared_mutex _mutex;
static std::unordered_set<int64_t> _tablet_ids_in_converting;
static std::set<std::string> _supported_functions;
};

using RowBlockDeleter = std::function<void(RowBlock*)>;
Expand Down
5 changes: 0 additions & 5 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,6 @@ class RowDescriptor {
RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
const std::vector<bool>& nullable_tuples);

static RowDescriptor create_default(const DescriptorTbl& desc_tbl,
const std::vector<bool>& nullable_tuples) {
return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples);
}

// standard copy c'tor, made explicit here
RowDescriptor(const RowDescriptor& desc)
: _tuple_desc_map(desc._tuple_desc_map),
Expand Down
1 change: 0 additions & 1 deletion be/src/runtime/primitive_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ struct PrimitiveTypeTraits<TYPE_HLL> {
using ColumnType = vectorized::ColumnString;
};

// only for adapt get_predicate_column_ptr
template <PrimitiveType type>
struct PredicatePrimitiveTypeTraits {
using PredicateFieldType = typename PrimitiveTypeTraits<type>::CppType;
Expand Down
50 changes: 18 additions & 32 deletions be/src/vec/columns/column_complex.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,41 +62,30 @@ class ColumnComplexType final : public COWHelper<IColumn, ColumnComplexType<T>>
data.push_back(*reinterpret_cast<const T*>(pos));
}

void insert_many_binary_data(char* data_array, uint32_t* len_array,
uint32_t* start_offset_array, size_t num) override {
void insert_binary_data(const char* pos, size_t length) {
insert_default();
T* pvalue = &get_element(size() - 1);
if (!length) {
*pvalue = *reinterpret_cast<const T*>(pos);
return;
}

if constexpr (std::is_same_v<T, BitmapValue>) {
for (size_t i = 0; i < num; i++) {
uint32_t len = len_array[i];
uint32_t start_offset = start_offset_array[i];
insert_default();
BitmapValue* pvalue = &get_element(size() - 1);
if (len != 0) {
BitmapValue value;
value.deserialize(data_array + start_offset);
*pvalue = std::move(value);
} else {
*pvalue = std::move(*reinterpret_cast<BitmapValue*>(data_array + start_offset));
}
}
pvalue->deserialize(pos);
} else if constexpr (std::is_same_v<T, HyperLogLog>) {
for (size_t i = 0; i < num; i++) {
uint32_t len = len_array[i];
uint32_t start_offset = start_offset_array[i];
insert_default();
HyperLogLog* pvalue = &get_element(size() - 1);
if (len != 0) {
HyperLogLog value;
value.deserialize(Slice(data_array + start_offset, len));
*pvalue = std::move(value);
} else {
*pvalue = std::move(*reinterpret_cast<HyperLogLog*>(data_array + start_offset));
}
}
pvalue->deserialize(Slice(pos, length));
} else {
LOG(FATAL) << "Unexpected type in column complex";
}
}

void insert_many_binary_data(char* data_array, uint32_t* len_array,
uint32_t* start_offset_array, size_t num) override {
for (size_t i = 0; i < num; i++) {
insert_binary_data(data_array + start_offset_array[i], len_array[i]);
}
}

void insert_default() override { data.push_back(T()); }

void insert_many_defaults(size_t length) override {
Expand Down Expand Up @@ -299,10 +288,7 @@ template <typename T>
ColumnPtr ColumnComplexType<T>::permute(const IColumn::Permutation& perm, size_t limit) const {
size_t size = data.size();

if (limit == 0)
limit = size;
else
limit = std::min(size, limit);
limit = limit ? std::min(size, limit) : size;

if (perm.size() < limit) {
LOG(FATAL) << "Size of permutation is less than required.";
Expand Down
13 changes: 9 additions & 4 deletions be/src/vec/exec/volap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,18 +252,23 @@ Status VOlapScanner::_init_return_columns(bool need_seq_col) {
if (!slot->is_materialized()) {
continue;
}
int32_t index = slot->col_unique_id() >= 0
? _tablet_schema.field_index(slot->col_unique_id())
: _tablet_schema.field_index(slot->col_name());

int32_t index = _tablet_schema.field_index(slot->col_unique_id());
if (index < 0) {
// rollup/materialized view should use col_name to find index
index = _tablet_schema.field_index(slot->col_name());
}

if (index < 0) {
std::stringstream ss;
ss << "field name is invalid. field=" << slot->col_name();
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}
_return_columns.push_back(index);
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable())
if (slot->is_nullable() && !_tablet_schema.column(index).is_nullable()) {
_tablet_columns_convert_to_null_set.emplace(index);
}
}

// expand the sequence column
Expand Down
11 changes: 6 additions & 5 deletions fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,10 @@ private RollupJobV2() {
super(JobType.ROLLUP);
}

public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs,
long baseIndexId, long rollupIndexId, String baseIndexName, String rollupIndexName,
List<Column> rollupSchema, int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
short rollupShortKeyColumnCount, OriginStatement origStmt) {
public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long timeoutMs, long baseIndexId,
long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType, short rollupShortKeyColumnCount,
OriginStatement origStmt) {
super(jobId, JobType.ROLLUP, dbId, tableId, tableName, timeoutMs);

this.baseIndexId = baseIndexId;
Expand All @@ -150,6 +150,7 @@ public RollupJobV2(long jobId, long dbId, long tableId, String tableName, long t
this.rollupIndexName = rollupIndexName;

this.rollupSchema = rollupSchema;

this.baseSchemaHash = baseSchemaHash;
this.rollupSchemaHash = rollupSchemaHash;
this.rollupKeysType = rollupKeysType;
Expand Down Expand Up @@ -376,8 +377,8 @@ protected void runWaitingTxnJob() throws AlterCancelException {

List<Column> fullSchema = tbl.getBaseSchema(true);
DescriptorTable descTable = new DescriptorTable();
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
for (Column column : fullSchema) {
TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
destSlotDesc.setIsMaterialized(true);
destSlotDesc.setColumn(column);
Expand Down
Loading

0 comments on commit ec3c911

Please sign in to comment.