Skip to content

Commit b6fd286

Browse files
committed
ci
1 parent 0131037 commit b6fd286

File tree

6 files changed

+44
-21
lines changed

6 files changed

+44
-21
lines changed

dbms/src/Storages/DeltaMerge/Delta/ColumnFileFlushTask.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ DeltaIndex::Updates ColumnFileFlushTask::prepare(WriteBatches & wbs)
4343
{
4444
if (!task.block_data)
4545
continue;
46-
46+
/*
4747
IColumn::Permutation perm;
4848
task.sorted = sortBlockByPk(getExtraHandleColumnDefine(context.is_common_handle), task.block_data, perm);
4949
if (task.sorted)
5050
delta_index_updates.emplace_back(task.deletes_offset, task.rows_offset, perm);
51-
51+
*/
5252
task.data_page = ColumnFileTiny::writeColumnFileData(context, task.block_data, 0, task.block_data.rows(), wbs);
5353
}
5454

dbms/src/Storages/DeltaMerge/VersionChain/DMFileHandleIndex.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class DMFileHandleIndex
5959
return start_row_id + *row_id;
6060
}
6161

62-
void calculateReadPacks(const PaddedPODArray<Handle> & handles)
62+
void calculateReadPacks(const std::span<const Handle> handles)
6363
{
6464
std::vector<UInt8> calc_read_packs(pack_range.count(), 0);
6565
UInt32 calc_read_count = 0;

dbms/src/Storages/DeltaMerge/VersionChain/VersionChain.cpp

+7-5
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,11 @@ UInt32 VersionChain<Handle>::replayBlock(
127127
auto cf_reader = cf.getReader(dm_context, data_provider, getHandleColumnDefinesPtr<Handle>(), ReadTag::MVCC);
128128
auto block = cf_reader->readNextBlock();
129129
RUNTIME_CHECK_MSG(!cf_reader->readNextBlock(), "{}: read all rows in one block is required!", cf.toString());
130-
const auto * handles_ptr = toColumnVectorDataPtr<Int64>(block.begin()->column);
131-
RUNTIME_CHECK_MSG(handles_ptr != nullptr, "TODO: support common handle");
132-
const auto & handles = *handles_ptr;
130+
const auto * handle_col = toColumnVectorDataPtr<Int64>(block.begin()->column);
131+
RUNTIME_CHECK_MSG(handle_col != nullptr, "TODO: support common handle");
132+
RUNTIME_CHECK(handle_col->size() > offset, handle_col->size(), offset);
133+
const std::span<const Handle> handles{
134+
handle_col->data() + offset, handle_col->size() - offset};
133135

134136
if (calculate_read_packs)
135137
calculateReadPacks(handles);
@@ -151,7 +153,7 @@ UInt32 VersionChain<Handle>::replayBlock(
151153
new_handle_to_row_ids->insert(std::make_pair(h, curr_row_id));
152154
base_versions->push_back(NotExistRowID);
153155
}
154-
return handles.size() - offset;
156+
return handles.size();
155157
}
156158

157159
template <Int64OrString Handle>
@@ -206,7 +208,7 @@ std::optional<RowID> VersionChain<Handle>::findBaseVersionFromDMFileOrDeleteRang
206208
}
207209

208210
template <Int64OrString Handle>
209-
void VersionChain<Handle>::calculateReadPacks(const PaddedPODArray<Handle> & handles)
211+
void VersionChain<Handle>::calculateReadPacks(const std::span<const Handle> handles)
210212
{
211213
assert(dmfile_or_delete_range_list->size() == 1);
212214
auto & dmfile_index = std::get<DMFileHandleIndex<Handle>>(dmfile_or_delete_range_list->front());

dbms/src/Storages/DeltaMerge/VersionChain/VersionChain.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#pragma once
1616

17+
#include <span>
1718
#include <Storages/DeltaMerge/ColumnFile/ColumnFileDataProvider.h>
1819
#include <Storages/DeltaMerge/VersionChain/Common.h>
1920
#include <Storages/DeltaMerge/VersionChain/DMFileHandleIndex.h>
@@ -69,7 +70,7 @@ class VersionChain
6970
[[nodiscard]] UInt32 replayDeleteRange(const ColumnFileDeleteRange & cf_delete_range);
7071

7172
[[nodiscard]] std::optional<RowID> findBaseVersionFromDMFileOrDeleteRangeList(Handle h);
72-
void calculateReadPacks(const PaddedPODArray<Handle> & handles);
73+
void calculateReadPacks(const std::span<const Handle> handles);
7374
void cleanHandleColumn();
7475

7576
DISALLOW_COPY_AND_MOVE(VersionChain);

dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store.cpp

+23-12
Original file line numberDiff line numberDiff line change
@@ -860,7 +860,8 @@ CATCH
860860
TEST_P(DeltaMergeStoreRWTest, WriteMultipleBlock)
861861
try
862862
{
863-
const size_t num_write_rows = 32;
863+
constexpr size_t num_write_rows = 32;
864+
constexpr bool clear_data_in_range = true;
864865

865866
// Test write multi blocks without overlap
866867
{
@@ -888,7 +889,7 @@ try
888889
auto file_ids = file_ids1;
889890
file_ids.insert(file_ids.cend(), file_ids2.begin(), file_ids2.end());
890891
file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end());
891-
store->ingestFiles(dm_context, range, file_ids, false);
892+
store->ingestFiles(dm_context, range, file_ids, !clear_data_in_range);
892893
break;
893894
}
894895
case TestMode::PageStorageV2_MemoryAndDisk:
@@ -900,7 +901,7 @@ try
900901
auto range = range1.merge(range3);
901902
auto file_ids = file_ids1;
902903
file_ids.insert(file_ids.cend(), file_ids3.begin(), file_ids3.end());
903-
store->ingestFiles(dm_context, range, file_ids, false);
904+
store->ingestFiles(dm_context, range, file_ids, !clear_data_in_range);
904905

905906
store->write(*db_context, db_context->getSettingsRef(), block2);
906907
break;
@@ -938,7 +939,7 @@ try
938939

939940
// Test write multi blocks with overlap
940941
{
941-
UInt64 tso1 = 1;
942+
UInt64 tso1 = 3; // ts of the same key should incre...
942943
UInt64 tso2 = 100;
943944
Block block1 = DMTestEnv::prepareSimpleWriteBlock(0, 1 * num_write_rows, false, tso1);
944945
Block block2 = DMTestEnv::prepareSimpleWriteBlock(1 * num_write_rows, 2 * num_write_rows, false, tso1);
@@ -960,11 +961,11 @@ try
960961
{
961962
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
962963
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
963-
store->ingestFiles(dm_context, range1, {file_ids1}, false);
964+
store->ingestFiles(dm_context, range1, {file_ids1}, clear_data_in_range);
964965
auto [range2, file_ids2] = genDMFile(*dm_context, block2);
965-
store->ingestFiles(dm_context, range2, {file_ids2}, false);
966+
store->ingestFiles(dm_context, range2, {file_ids2}, clear_data_in_range);
966967
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
967-
store->ingestFiles(dm_context, range3, {file_ids3}, false);
968+
store->ingestFiles(dm_context, range3, {file_ids3}, clear_data_in_range);
968969
break;
969970
}
970971
case TestMode::PageStorageV2_MemoryAndDisk:
@@ -974,9 +975,9 @@ try
974975

975976
auto dm_context = store->newDMContext(*db_context, db_context->getSettingsRef());
976977
auto [range1, file_ids1] = genDMFile(*dm_context, block1);
977-
store->ingestFiles(dm_context, range1, {file_ids1}, false);
978+
store->ingestFiles(dm_context, range1, {file_ids1}, clear_data_in_range);
978979
auto [range3, file_ids3] = genDMFile(*dm_context, block3);
979-
store->ingestFiles(dm_context, range3, {file_ids3}, false);
980+
store->ingestFiles(dm_context, range3, {file_ids3}, clear_data_in_range);
980981
break;
981982
}
982983
}
@@ -1018,8 +1019,8 @@ try
10181019
db_context->getSettingsRef(),
10191020
columns,
10201021
{RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize())},
1021-
/* num_streams= */ 1,
1022-
/* start_ts= */ static_cast<UInt64>(1),
1022+
/* num_streams= */ 2,
1023+
/* start_ts= */ static_cast<UInt64>(2),
10231024
EMPTY_FILTER,
10241025
std::vector<RuntimeFilterPtr>{},
10251026
0,
@@ -1031,7 +1032,7 @@ try
10311032
in,
10321033
Strings({DMTestEnv::pk_name}),
10331034
createColumns({
1034-
createColumn<Int64>(createNumbers<Int64>(0, 2 * num_write_rows)),
1035+
createColumn<Int64>(createNumbers<Int64>(64, 96)),
10351036
}));
10361037
}
10371038
}
@@ -3903,6 +3904,16 @@ CATCH
39033904

39043905
void DeltaMergeStoreRWTest::dupHandleVersionAndDeltaIndexAdvancedThanSnapshot()
39053906
{
3907+
// Always use delta index in this case.
3908+
auto & global_settings = db_context->getGlobalContext().getSettingsRef();
3909+
bool enable_version_chain = global_settings.dt_enable_version_chain;
3910+
if (enable_version_chain)
3911+
global_settings.set("dt_enable_version_chain", "false");
3912+
SCOPE_EXIT({
3913+
if (enable_version_chain)
3914+
global_settings.set("dt_enable_version_chain", "true");
3915+
});
3916+
39063917
auto table_column_defines = DMTestEnv::getDefaultColumns();
39073918
store = reload(table_column_defines);
39083919

dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,15 @@ class SegmentReadTaskTest : public SegmentTestBasic
9090
// hack to change the "immutable" delta-index on delta-snapshot for testing
9191
(*const_cast<DeltaIndexPtr *>(&first_snap->delta->getSharedDeltaIndex())) = broken_delta_index;
9292

93+
// Always use delta index in this case.
94+
auto & global_settings = db_context->getGlobalContext().getSettingsRef();
95+
bool enable_version_chain = global_settings.dt_enable_version_chain;
96+
if (enable_version_chain)
97+
global_settings.set("dt_enable_version_chain", "false");
98+
SCOPE_EXIT({
99+
if (enable_version_chain)
100+
global_settings.set("dt_enable_version_chain", "true");
101+
});
93102
auto task = std::make_shared<DM::SegmentReadTask>(
94103
first,
95104
first_snap,

0 commit comments

Comments
 (0)