diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h index 047af1e9d1190b..bf3a6118d76bac 100644 --- a/be/src/olap/column_mapping.h +++ b/be/src/olap/column_mapping.h @@ -30,11 +30,11 @@ struct ColumnMapping { ColumnMapping() = default; virtual ~ColumnMapping() = default; - bool has_reference() const { return expr != nullptr || ref_column >= 0; } + bool has_reference() const { return expr != nullptr || ref_column_idx >= 0; } // <0: use default value // >=0: use origin column - int32_t ref_column = -1; + int32_t ref_column_idx = -1; // normally for default value. stores values for filters WrapperField* default_value = nullptr; std::shared_ptr expr; diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 38dbcf1c429bb4..914790562b7d11 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -17,6 +17,10 @@ #include "olap/schema_change.h" +#include +#include +#include + #include #include #include @@ -285,52 +289,63 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns())); } - const int row_size = ref_block->rows(); - const int column_size = new_block->columns(); + const int row_num = ref_block->rows(); + const int new_schema_cols_num = new_block->columns(); - // swap ref_block[key] and new_block[value] + // will be used for swaping ref_block[entry.first] and new_block[entry.second] std::list> swap_idx_list; - for (int idx = 0; idx < column_size; idx++) { - // just for MV, schema change should not run into this branch - if (_schema_mapping[idx].expr != nullptr) { + for (int idx = 0; idx < new_schema_cols_num; idx++) { + auto expr = _schema_mapping[idx].expr; + if (expr != nullptr) { vectorized::VExprContextSPtr ctx; - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx)); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx)); RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc)); RETURN_IF_ERROR(ctx->open(state.get())); - int result_column_id = -1; - RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id)); - if (ref_block->get_by_position(result_column_id).column == nullptr) { + int result_tmp_column_idx = -1; + RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx)); + auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx); + if (result_tmp_column_def.column == nullptr) { return Status::Error( - "{} result column is nullptr", - ref_block->get_by_position(result_column_id).name); + "result column={} is nullptr, input expr={}", result_tmp_column_def.name, + apache::thrift::ThriftDebugString(*expr)); } - ref_block->replace_by_position_if_const(result_column_id); + ref_block->replace_by_position_if_const(result_tmp_column_idx); - if (ref_block->get_by_position(result_column_id).column->size() != row_size) { + if (result_tmp_column_def.column->size() != row_num) { return Status::Error( - "{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name, - row_size, ref_block->get_by_position(result_column_id).column->size()); + "result size invalid, expect={}, real={}; input expr={}", row_num, + result_tmp_column_def.column->size(), + apache::thrift::ThriftDebugString(*expr)); + } + + if (_type == SCHEMA_CHANGE) { + // danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change + // for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs + + // column_idx in base schema + int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx; + DCHECK_GE(ref_column_idx, 0); + auto& ref_column_def = ref_block->get_by_position(ref_column_idx); + RETURN_IF_ERROR( + _check_cast_valid(ref_column_def.column, result_tmp_column_def.column)); } - RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column, - ref_block->get_by_position(result_column_id).column, - _type)); - swap_idx_list.emplace_back(result_column_id, idx); - } else if (_schema_mapping[idx].ref_column < 0) { + swap_idx_list.emplace_back(result_tmp_column_idx, idx); + } else if (_schema_mapping[idx].ref_column_idx < 0) { // new column, write default value auto* value = _schema_mapping[idx].default_value; auto column = new_block->get_by_position(idx).column->assume_mutable(); if (value->is_null()) { DCHECK(column->is_nullable()); - column->insert_many_defaults(row_size); + column->insert_many_defaults(row_num); } else { auto type_info = get_type_info(_schema_mapping[idx].new_column); DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(), - value->ptr(), column, row_size); + value->ptr(), column, row_num); } } else { // same type, just swap column - swap_idx_list.emplace_back(_schema_mapping[idx].ref_column, idx); + swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx); } } @@ -368,78 +383,90 @@ Status BlockChanger::change_block(vectorized::Block* ref_block, return Status::OK(); } -// This check is for MV to prevent schema-change from causing data loss -Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column, - vectorized::ColumnPtr new_column, AlterTabletType type) { - if (ref_column->size() != new_column->size()) { +// This check can prevent schema-change from causing data loss after type cast +Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column, + vectorized::ColumnPtr output_column) { + if (input_column->size() != output_column->size()) { return Status::InternalError( - "column size is changed, ref_column_size={}, new_column_size={}", - ref_column->size(), new_column->size()); - } - if (type == ROLLUP) { - return Status::OK(); + "column size is changed, input_column_size={}, output_column_size={}; " + "input_column={}", + input_column->size(), output_column->size(), input_column->get_name()); } - if (ref_column->is_nullable() != new_column->is_nullable()) { - if (ref_column->is_nullable()) { + DCHECK_EQ(input_column->size(), output_column->size()) + << "length check should have done before calling this function!"; + + if (input_column->is_nullable() != output_column->is_nullable()) { + if (input_column->is_nullable()) { const auto* ref_null_map = - vectorized::check_and_get_column(ref_column) + vectorized::check_and_get_column(input_column) ->get_null_map_column() .get_data() .data(); bool is_changed = false; - for (size_t i = 0; i < ref_column->size(); i++) { + for (size_t i = 0; i < input_column->size(); i++) { is_changed |= ref_null_map[i]; } if (is_changed) { - return Status::DataQualityError("Null data is changed to not nullable"); + return Status::DataQualityError( + "some null data is changed to not null, intput_column={}", + input_column->get_name()); } } else { const auto& null_map_column = - vectorized::check_and_get_column(new_column) + vectorized::check_and_get_column(output_column) ->get_null_map_column(); const auto& nested_column = - vectorized::check_and_get_column(new_column) + vectorized::check_and_get_column(output_column) ->get_nested_column(); const auto* new_null_map = null_map_column.get_data().data(); - if (null_map_column.size() != new_column->size() || - nested_column.size() != new_column->size()) { - DCHECK(false); + if (null_map_column.size() != output_column->size()) { return Status::InternalError( - "null_map_column size is changed, null_map_column_size={}, " - "new_column_size={}", - null_map_column.size(), new_column->size()); + "null_map_column size mismatch output_column_size, " + "null_map_column_size={}, output_column_size={}; input_column={}", + null_map_column.size(), output_column->size(), input_column->get_name()); + } + + if (nested_column.size() != output_column->size()) { + return Status::InternalError( + "nested_column size is changed, nested_column_size={}, " + "ouput_column_size={}; input_column={}", + nested_column.size(), output_column->size(), input_column->get_name()); } bool is_changed = false; - for (size_t i = 0; i < ref_column->size(); i++) { + for (size_t i = 0; i < input_column->size(); i++) { is_changed |= new_null_map[i]; } if (is_changed) { - return Status::DataQualityError("Some data is changed to null"); + return Status::DataQualityError( + "some not null data is changed to null, intput_column={}", + input_column->get_name()); } } } - if (ref_column->is_nullable() && new_column->is_nullable()) { + if (input_column->is_nullable() && output_column->is_nullable()) { const auto* ref_null_map = - vectorized::check_and_get_column(ref_column) + vectorized::check_and_get_column(input_column) ->get_null_map_column() .get_data() .data(); const auto* new_null_map = - vectorized::check_and_get_column(new_column) + vectorized::check_and_get_column(output_column) ->get_null_map_column() .get_data() .data(); bool is_changed = false; - for (size_t i = 0; i < ref_column->size(); i++) { + for (size_t i = 0; i < input_column->size(); i++) { is_changed |= (ref_null_map[i] != new_null_map[i]); } if (is_changed) { - return Status::DataQualityError("is_null of data is changed!"); + return Status::DataQualityError( + "null map is changed after calculation, input_column={}", + input_column->get_name()); } } return Status::OK(); @@ -1203,6 +1230,8 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i); column_mapping->new_column = &new_column; + column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name()); + if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) { auto mv_param = materialized_function_map.find(column_name_lower)->second; column_mapping->expr = mv_param.expr; @@ -1211,9 +1240,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, } } - int32_t column_index = base_tablet_schema->field_index(new_column.name()); - if (column_index >= 0) { - column_mapping->ref_column = column_index; + if (column_mapping->ref_column_idx >= 0) { continue; } @@ -1236,7 +1263,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, return Status::InternalError("failed due to operate on shadow column"); } // Newly added column go here - column_mapping->ref_column = -1; + column_mapping->ref_column_idx = -1; if (i < base_tablet_schema->num_short_key_columns()) { *sc_directly = true; @@ -1265,7 +1292,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, continue; } - if (column_mapping->ref_column != i - num_default_value) { + if (column_mapping->ref_column_idx != i - num_default_value) { *sc_sorting = true; return Status::OK(); } @@ -1332,9 +1359,9 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params, if (column_mapping->expr != nullptr) { *sc_directly = true; return Status::OK(); - } else if (column_mapping->ref_column >= 0) { + } else if (column_mapping->ref_column_idx >= 0) { const auto& column_new = new_tablet_schema->column(i); - const auto& column_old = base_tablet_schema->column(column_mapping->ref_column); + const auto& column_old = base_tablet_schema->column(column_mapping->ref_column_idx); // index changed if (column_new.is_bf_column() != column_old.is_bf_column() || column_new.has_bitmap_index() != column_old.has_bitmap_index() || diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h index 64ab0c724d0345..c29cb49a7aaece 100644 --- a/be/src/olap/schema_change.h +++ b/be/src/olap/schema_change.h @@ -87,7 +87,7 @@ class BlockChanger { private: static Status _check_cast_valid(vectorized::ColumnPtr ref_column, - vectorized::ColumnPtr new_column, AlterTabletType type); + vectorized::ColumnPtr new_column); // @brief column-mapping specification of new schema SchemaMapping _schema_mapping; diff --git a/regression-test/suites/schema_change_p0/test_move_column_with_cast.groovy b/regression-test/suites/schema_change_p0/test_move_column_with_cast.groovy new file mode 100644 index 00000000000000..e89542b62854fd --- /dev/null +++ b/regression-test/suites/schema_change_p0/test_move_column_with_cast.groovy @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_move_column_with_cast") { + def tableName = "test_move_column_with_cast" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + k BIGINT, + v SMALLINT NOT NULL + ) DUPLICATE KEY(`k`) + DISTRIBUTED BY HASH(k) BUCKETS 4 + properties("replication_num" = "1"); + """ + + sql """ INSERT INTO ${tableName} VALUES(1, 1); """ + sql """ ALTER TABLE ${tableName} ADD COLUMN t2 DATETIME DEFAULT NULL; """ + sql """ ALTER TABLE ${tableName} MODIFY COLUMN v BIGINT AFTER t2; """ + + waitForSchemaChangeDone { + sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1""" + time 600 + } +} \ No newline at end of file