Skip to content

Commit

Permalink
[fix](schema-change) Fix wrong intput column for cast validity check
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 committed Aug 5, 2024
1 parent 455018b commit a2baaf3
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 63 deletions.
4 changes: 2 additions & 2 deletions be/src/olap/column_mapping.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ struct ColumnMapping {
ColumnMapping() = default;
virtual ~ColumnMapping() = default;

bool has_reference() const { return expr != nullptr || ref_column >= 0; }
bool has_reference() const { return expr != nullptr || ref_column_idx >= 0; }

// <0: use default value
// >=0: use origin column
int32_t ref_column = -1;
int32_t ref_column_idx = -1;
// normally for default value. stores values for filters
WrapperField* default_value = nullptr;
std::shared_ptr<TExpr> expr;
Expand Down
147 changes: 87 additions & 60 deletions be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

#include "olap/schema_change.h"

#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>
#include <thrift/protocol/TDebugProtocol.h>

#include <algorithm>
#include <exception>
#include <map>
Expand Down Expand Up @@ -285,52 +289,63 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
vectorized::VExprContext::filter_block(ctx.get(), ref_block, ref_block->columns()));
}

const int row_size = ref_block->rows();
const int column_size = new_block->columns();
const int row_num = ref_block->rows();
const int new_schema_cols_num = new_block->columns();

// swap ref_block[key] and new_block[value]
// will be used for swaping ref_block[entry.first] and new_block[entry.second]
std::list<std::pair<int, int>> swap_idx_list;
for (int idx = 0; idx < column_size; idx++) {
// just for MV, schema change should not run into this branch
if (_schema_mapping[idx].expr != nullptr) {
for (int idx = 0; idx < new_schema_cols_num; idx++) {
auto expr = _schema_mapping[idx].expr;
if (expr != nullptr) {
vectorized::VExprContextSPtr ctx;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*_schema_mapping[idx].expr, ctx));
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*expr, ctx));
RETURN_IF_ERROR(ctx->prepare(state.get(), row_desc));
RETURN_IF_ERROR(ctx->open(state.get()));

int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
if (ref_block->get_by_position(result_column_id).column == nullptr) {
int result_tmp_column_idx = -1;
RETURN_IF_ERROR(ctx->execute(ref_block, &result_tmp_column_idx));
auto& result_tmp_column_def = ref_block->get_by_position(result_tmp_column_idx);
if (result_tmp_column_def.column == nullptr) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} result column is nullptr",
ref_block->get_by_position(result_column_id).name);
"result column={} is nullptr, input expr={}", result_tmp_column_def.name,
apache::thrift::ThriftDebugString(*expr));
}
ref_block->replace_by_position_if_const(result_column_id);
ref_block->replace_by_position_if_const(result_tmp_column_idx);

if (ref_block->get_by_position(result_column_id).column->size() != row_size) {
if (result_tmp_column_def.column->size() != row_num) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"{} size invalid, expect={}, real={}", new_block->get_by_position(idx).name,
row_size, ref_block->get_by_position(result_column_id).column->size());
"result size invalid, expect={}, real={}; input expr={}", row_num,
result_tmp_column_def.column->size(),
apache::thrift::ThriftDebugString(*expr));
}

if (_type == SCHEMA_CHANGE) {
// danger casts (expected to be rejected by upstream caller) may cause data to be null and result in data loss in schema change
// for rollup, this check is unecessary, and ref columns are not set in this case, it works on exprs

// column_idx in base schema
int32_t ref_column_idx = _schema_mapping[idx].ref_column_idx;
DCHECK_GE(ref_column_idx, 0);
auto& ref_column_def = ref_block->get_by_position(ref_column_idx);
RETURN_IF_ERROR(
_check_cast_valid(ref_column_def.column, result_tmp_column_def.column));
}
RETURN_IF_ERROR(_check_cast_valid(ref_block->get_by_position(idx).column,
ref_block->get_by_position(result_column_id).column,
_type));
swap_idx_list.emplace_back(result_column_id, idx);
} else if (_schema_mapping[idx].ref_column < 0) {
swap_idx_list.emplace_back(result_tmp_column_idx, idx);
} else if (_schema_mapping[idx].ref_column_idx < 0) {
// new column, write default value
auto* value = _schema_mapping[idx].default_value;
auto column = new_block->get_by_position(idx).column->assume_mutable();
if (value->is_null()) {
DCHECK(column->is_nullable());
column->insert_many_defaults(row_size);
column->insert_many_defaults(row_num);
} else {
auto type_info = get_type_info(_schema_mapping[idx].new_column);
DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
value->ptr(), column, row_size);
value->ptr(), column, row_num);
}
} else {
// same type, just swap column
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column, idx);
swap_idx_list.emplace_back(_schema_mapping[idx].ref_column_idx, idx);
}
}

Expand Down Expand Up @@ -368,78 +383,90 @@ Status BlockChanger::change_block(vectorized::Block* ref_block,
return Status::OK();
}

// This check is for MV to prevent schema-change from causing data loss
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column, AlterTabletType type) {
if (ref_column->size() != new_column->size()) {
// This check can prevent schema-change from causing data loss after type cast
Status BlockChanger::_check_cast_valid(vectorized::ColumnPtr input_column,
vectorized::ColumnPtr output_column) {
if (input_column->size() != output_column->size()) {
return Status::InternalError(
"column size is changed, ref_column_size={}, new_column_size={}",
ref_column->size(), new_column->size());
}
if (type == ROLLUP) {
return Status::OK();
"column size is changed, input_column_size={}, output_column_size={}; "
"input_column={}",
input_column->size(), output_column->size(), input_column->get_name());
}
if (ref_column->is_nullable() != new_column->is_nullable()) {
if (ref_column->is_nullable()) {
DCHECK_EQ(input_column->size(), output_column->size())
<< "length check should have done before calling this function!";

if (input_column->is_nullable() != output_column->is_nullable()) {
if (input_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= ref_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Null data is changed to not nullable");
return Status::DataQualityError(
"some null data is changed to not null, intput_column={}",
input_column->get_name());
}
} else {
const auto& null_map_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column();
const auto& nested_column =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_nested_column();
const auto* new_null_map = null_map_column.get_data().data();

if (null_map_column.size() != new_column->size() ||
nested_column.size() != new_column->size()) {
DCHECK(false);
if (null_map_column.size() != output_column->size()) {
return Status::InternalError(
"null_map_column size is changed, null_map_column_size={}, "
"new_column_size={}",
null_map_column.size(), new_column->size());
"null_map_column size mismatch output_column_size, "
"null_map_column_size={}, output_column_size={}; input_column={}",
null_map_column.size(), output_column->size(), input_column->get_name());
}

if (nested_column.size() != output_column->size()) {
return Status::InternalError(
"nested_column size is changed, nested_column_size={}, "
"ouput_column_size={}; input_column={}",
nested_column.size(), output_column->size(), input_column->get_name());
}

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= new_null_map[i];
}
if (is_changed) {
return Status::DataQualityError("Some data is changed to null");
return Status::DataQualityError(
"some not null data is changed to null, intput_column={}",
input_column->get_name());
}
}
}

if (ref_column->is_nullable() && new_column->is_nullable()) {
if (input_column->is_nullable() && output_column->is_nullable()) {
const auto* ref_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(input_column)
->get_null_map_column()
.get_data()
.data();
const auto* new_null_map =
vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
vectorized::check_and_get_column<vectorized::ColumnNullable>(output_column)
->get_null_map_column()
.get_data()
.data();

bool is_changed = false;
for (size_t i = 0; i < ref_column->size(); i++) {
for (size_t i = 0; i < input_column->size(); i++) {
is_changed |= (ref_null_map[i] != new_null_map[i]);
}
if (is_changed) {
return Status::DataQualityError("is_null of data is changed!");
return Status::DataQualityError(
"null map is changed after calculation, input_column={}",
input_column->get_name());
}
}
return Status::OK();
Expand Down Expand Up @@ -1203,6 +1230,8 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
ColumnMapping* column_mapping = changer->get_mutable_column_mapping(i);
column_mapping->new_column = &new_column;

column_mapping->ref_column_idx = base_tablet_schema->field_index(new_column.name());

if (materialized_function_map.find(column_name_lower) != materialized_function_map.end()) {
auto mv_param = materialized_function_map.find(column_name_lower)->second;
column_mapping->expr = mv_param.expr;
Expand All @@ -1211,9 +1240,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
}
}

int32_t column_index = base_tablet_schema->field_index(new_column.name());
if (column_index >= 0) {
column_mapping->ref_column = column_index;
if (column_mapping->ref_column_idx >= 0) {
continue;
}

Expand All @@ -1236,7 +1263,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
return Status::InternalError("failed due to operate on shadow column");
}
// Newly added column go here
column_mapping->ref_column = -1;
column_mapping->ref_column_idx = -1;

if (i < base_tablet_schema->num_short_key_columns()) {
*sc_directly = true;
Expand Down Expand Up @@ -1265,7 +1292,7 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
continue;
}

if (column_mapping->ref_column != i - num_default_value) {
if (column_mapping->ref_column_idx != i - num_default_value) {
*sc_sorting = true;
return Status::OK();
}
Expand Down Expand Up @@ -1332,9 +1359,9 @@ Status SchemaChangeJob::parse_request(const SchemaChangeParams& sc_params,
if (column_mapping->expr != nullptr) {
*sc_directly = true;
return Status::OK();
} else if (column_mapping->ref_column >= 0) {
} else if (column_mapping->ref_column_idx >= 0) {
const auto& column_new = new_tablet_schema->column(i);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column);
const auto& column_old = base_tablet_schema->column(column_mapping->ref_column_idx);
// index changed
if (column_new.is_bf_column() != column_old.is_bf_column() ||
column_new.has_bitmap_index() != column_old.has_bitmap_index() ||
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/schema_change.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class BlockChanger {

private:
static Status _check_cast_valid(vectorized::ColumnPtr ref_column,
vectorized::ColumnPtr new_column, AlterTabletType type);
vectorized::ColumnPtr new_column);

// @brief column-mapping specification of new schema
SchemaMapping _schema_mapping;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_move_column_with_cast") {
def tableName = "test_move_column_with_cast"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k BIGINT,
v SMALLINT NOT NULL
) DUPLICATE KEY(`k`)
DISTRIBUTED BY HASH(k) BUCKETS 4
properties("replication_num" = "1");
"""

sql """ INSERT INTO ${tableName} VALUES(1, 1); """
sql """ ALTER TABLE ${tableName} ADD COLUMN t2 DATETIME DEFAULT NULL; """
sql """ ALTER TABLE ${tableName} MODIFY COLUMN v BIGINT AFTER t2; """

waitForSchemaChangeDone {
sql """SHOW ALTER TABLE COLUMN WHERE IndexName='${tableName}' ORDER BY createtime DESC LIMIT 1"""
time 600
}
}

0 comments on commit a2baaf3

Please sign in to comment.