Skip to content

Commit

Permalink
[fix](schema-change) Fix wrong intput column for cast validity check
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Aug 5, 2024
1 parent 455018b commit 057b768
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 57 deletions.
4 changes: 2 additions & 2 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ struct ColumnMapping {
ColumnMapping() = default;
virtual ~ColumnMapping() = default;

bool has_reference() const { return expr != nullptr || ref_column >= 0; }
bool has_reference() const { return expr != nullptr || ref_column_idx >= 0; }

// <0: use default value
// >=0: use origin column
int32_t ref_column = -1;
int32_t ref_column_idx = -1;
// normally for default value. stores values for filters
WrapperField* default_value = nullptr;
std::shared_ptr<TExpr> expr;
Expand Down
129 changes: 74 additions & 55 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "olap/schema_change.h"

#include <glog/logging.h>

#include <algorithm>
#include <exception>
#include <map>
Expand Down Expand Up @@ -285,52 +287,57 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
}

const int row_size = ref_block->rows();
const int column_size = new_block->columns();
const int row_num = ref_block->rows();
const int new_schema_cols_num = new_block->columns();

// swap ref_block[key] and new_block[value]
// will be used for swaping ref_block[entry.first] and new_block[entry.second]
std::list<std::pair<int, int>> swap_idx_list;
for (int idx = 0; idx < column_size; idx++) {
// just for MV, schema change should not run into this branch
for (int idx = 0; idx < new_schema_cols_num; idx++) {
if (_schema_mapping[idx].expr != nullptr) {
// column_idx in base schema
int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx;
DCHECK_GE(ref_column_idx, 0)
<< "ref column idx must has been set when parsing alter tablet request";
auto& ref_column_def = ref_block->get_by_position(ref_column_idx);

vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx));
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
if (ref_block->get_by_position(result_column_id).column == nullptr) {
int result_tmp_column_idx = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx));
auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx);
if (result_tmp_column_def.column == nullptr) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} result column is nullptr",
ref_block->get_by_position(result_column_id).name);
"result column={} is nullptr, input column={}", result_tmp_column_def.name,
ref_column_def.name);
}
ref_block->replace_by_position_if_const(result_column_id);
ref_block->replace_by_position_if_const(result_tmp_column_idx);

if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
if (result_tmp_column_def.column->size() != row_num) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name,
row_size, ref_block->get_by_position(result_column_id).column->size());
"result size invalid, expect={}, real={}; ref_column={}", row_num,
ref_column_def.column->size(), ref_column_def.name);
}
RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column,
ref_block->get_by_position(result_column_id).column,
_type));
swap_idx_list.emplace_back(result_column_id, idx);
} else if (_schema_mapping[idx].ref_column < 0) {
RETURN_IF_ERROR(
_check_cast_valid(ref_column_def.column, result_tmp_column_def.column, _type));
swap_idx_list.emplace_back(result_tmp_column_idx, idx);
} else if (_schema_mapping[idx].ref_column_idx < 0) {
// new column, write default value
auto* value = _schema_mapping[idx].default_value;
auto column = new_block->get_by_position(idx).column->assume_mutable();
if (value->is_null()) {
DCHECK(column->is_nullable());
column->insert_many_defaults(row_size);
column->insert_many_defaults(row_num);
} else {
auto type_info = get_type_info(_schema_mapping[idx].new_column);
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
value->ptr(), column, row_size);
value->ptr(), column, row_num);
}
} else {
// same type, just swap column
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column, idx);
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx);
}
}

Expand Down Expand Up @@ -368,78 +375,90 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
return Status::OK();
}

// This check is for MV to prevent schema-change from causing data loss
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column, AlterTabletType type) {
if (ref_column->size() != new_column->size()) {
// This check can prevent schema-change from causing data loss after type cast
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column,
vectorized::ColumnPtr output_column, AlterTabletType type) {
if (input_column->size() != output_column->size()) {
return Status::InternalError(
"column size is changed, ref_column_size={}, new_column_size={}",
ref_column->size(), new_column->size());
"column size is changed, input_column_size={}, output_column_size={}; "
"input_column={}",
input_column->size(), output_column->size(), input_column->get_name());
}
if (type == ROLLUP) {
return Status::OK();
}
if (ref_column->is_nullable() != new_column->is_nullable()) {
if (ref_column->is_nullable()) {
if (input_column->is_nullable() != output_column->is_nullable()) {
if (input_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= ref_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Null data is changed to not nullable");
return Status::DataQualityError(
"some null data is changed to not null, intput_column={}",
input_column->get_name());
}
} else {
const auto& null_map_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column();
const auto& nested_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_nested_column();
const auto* new_null_map = null_map_column.get_data().data();

if (null_map_column.size() != new_column->size() ||
nested_column.size() != new_column->size()) {
DCHECK(false);
if (null_map_column.size() != output_column->size()) {
return Status::InternalError(
"null_map_column size is changed, null_map_column_size={}, "
"new_column_size={}",
null_map_column.size(), new_column->size());
"null_map_column size mismatch output_column_size, "
"null_map_column_size={}, output_column_size={}; input_column={}",
null_map_column.size(), output_column->size(), input_column->get_name());
}

if (nested_column.size() != output_column->size()) {
return Status::InternalError(
"nested_column size is changed, nested_column_size={}, "
"ouput_column_size={}; input_column={}",
nested_column.size(), output_column->size(), input_column->get_name());
}

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= new_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Some data is changed to null");
return Status::DataQualityError(
"some not null data is changed to null, intput_column={}",
input_column->get_name());
}
}
}

if (ref_column->is_nullable() && new_column->is_nullable()) {
if (input_column->is_nullable() && output_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();
const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= (ref_null_map[i] != new_null_map[i]);
}
if (is_changed) {
return Status::DataQualityError("is_null of data is changed!");
return Status::DataQualityError(
"null map is changed after calculation, input_column={}",
input_column->get_name());
}
}
return Status::OK();
Expand Down Expand Up @@ -1203,6 +1222,8 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;

column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name());

if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) {
auto mv_param = materialized_function_map.find(column_name_lower)->second;
column_mapping->expr = mv_param.expr;
Expand All @@ -1211,9 +1232,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
}
}

int32_t column_index = base_tablet_schema->field_index(new_column.name());
if (column_index >= 0) {
column_mapping->ref_column = column_index;
if (column_mapping->ref_column_idx >= 0) {
continue;
}

Expand All @@ -1236,7 +1255,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
return Status::InternalError("failed due to operate on shadow column");
}
// Newly added column go here
column_mapping->ref_column = -1;
column_mapping->ref_column_idx = -1;

if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
Expand Down Expand Up @@ -1265,7 +1284,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
continue;
}

if (column_mapping->ref_column != i - num_default_value) {
if (column_mapping->ref_column_idx != i - num_default_value) {
*sc_sorting = true;
return Status::OK();
}
Expand Down Expand Up @@ -1332,9 +1351,9 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
if (column_mapping->expr != nullptr) {
*sc_directly = true;
return Status::OK();
} else if (column_mapping->ref_column >= 0) {
} else if (column_mapping->ref_column_idx >= 0) {
const auto& column_new = new_tablet_schema->column(i);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column_idx);
// index changed
if (column_new.is_bf_column() != column_old.is_bf_column() ||
column_new.has_bitmap_index() != column_old.has_bitmap_index() ||
Expand Down

0 comments on commit 057b768

Please sign in to comment.