Skip to content

Commit

Permalink
[fix](schema-change) Rebuild distribution info after columns as hash …
Browse files Browse the repository at this point in the history
…key are modified
  • Loading branch information
TangSiyang2001 committed Aug 13, 2024
1 parent 1f87ecf commit 705c51b
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3052,6 +3052,7 @@ public void updateBaseIndexSchema(OlapTable olapTable, Map<Long, LinkedList<Colu
}
olapTable.setIndexes(indexes);
olapTable.rebuildFullSchema();
olapTable.rebuildDistributionInfo();
}

public void replayModifyTableAddOrDropInvertedIndices(TableAddOrDropInvertedIndicesInfo info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ private void onFinished(OlapTable tbl) {
}
// rebuild table's full schema
tbl.rebuildFullSchema();
tbl.rebuildDistributionInfo();

// update bloom filter
if (hasBfChange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,8 @@ public String toString() {
public RandomDistributionInfo toRandomDistributionInfo() {
return new RandomDistributionInfo(bucketNum);
}

public void setDistributionColumns(List<Column> column) {
this.distributionColumns = column;
}
}
24 changes: 24 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,30 @@ public void rebuildFullSchema() {
}
}

public void rebuildDistributionInfo() {
if (!Objects.equals(defaultDistributionInfo.getType(), DistributionInfoType.HASH)) {
return;
}
HashDistributionInfo distributionInfo = (HashDistributionInfo) defaultDistributionInfo;
Set<String> originalColumnsNames =
distributionInfo.getDistributionColumns()
.stream()
.map(Column::getName)
.collect(Collectors.toSet());

List<Column> newDistributionColumns = getBaseSchema()
.stream()
.filter(column -> originalColumnsNames.contains(column.getName()))
.map(Column::new)
.collect(Collectors.toList());
distributionInfo.setDistributionColumns(newDistributionColumns);

getPartitions()
.stream()
.map(Partition::getDistributionInfo)
.forEach(info -> ((HashDistributionInfo) info).setDistributionColumns(newDistributionColumns));
}

public boolean deleteIndexInfo(String indexName) {
if (!indexNameToId.containsKey(indexName)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_dynamic_partition_mod_distribution_key") {
def tableName = "test_dynamic_partition_mod_distribution_key"
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
CREATE TABLE IF NOT EXISTS ${tableName} (
k1 DATE NOT NULL,
k2 VARCHAR(20) NOT NULL,
v INT NOT NULL
) DUPLICATE KEY (k1)
PARTITION BY RANGE(k1) ()
DISTRIBUTED BY HASH(k1, k2) BUCKETS 1
PROPERTIES (
"dynamic_partition.enable"="true",
"dynamic_partition.end"="3",
"dynamic_partition.buckets"="1",
"dynamic_partition.start"="-3",
"dynamic_partition.prefix"="p",
"dynamic_partition.time_unit"="DAY",
"dynamic_partition.create_history_partition"="true",
"dynamic_partition.replication_allocation" = "tag.location.default: 1")
"""

sql """ alter table ${tableName} modify column k1 comment 'new_comment_for_k1' """
sql """ alter table ${tableName} modify column k2 varchar(255) """
waitForSchemaChangeDone {
sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${tableName}' ORDER BY createtime DESC LIMIT 1 """
time 600
}
sql """ ADMIN SET FRONTEND CONFIG ('dynamic_partition_check_interval_seconds' = '1') """
sql """ alter table ${tableName} set('dynamic_partition.end'='5') """
result = sql "show partitions from ${tableName}"
for (def retry = 0; retry < 10; retry++) { // at most wait 120s
if (result.size() == 9) {
break;
}
logger.info("wait dynamic partition scheduler, sleep 1s")
sleep(1000) // sleep 1s
result = sql "show partitions from ${tableName}"
}
assertEquals(9, result.size())
}

0 comments on commit 705c51b

Please sign in to comment.