Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator support batch serialize #9777

Open
wants to merge 66 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
b869b2f
new hash
guo-shaoge Nov 25, 2024
e8a2df8
prefetch done
guo-shaoge Nov 26, 2024
b314166
executeImplBatchStringHashMap done
guo-shaoge Nov 27, 2024
ec6e892
handle resize exception done
guo-shaoge Nov 27, 2024
9dc702d
tmp save
guo-shaoge Nov 27, 2024
ce1f767
revert Serialized Key changes
guo-shaoge Nov 28, 2024
8ac8beb
refine
guo-shaoge Nov 28, 2024
0053ce8
refine
guo-shaoge Nov 28, 2024
fcf8ed2
fix unit test
guo-shaoge Nov 29, 2024
ae7b969
refine
guo-shaoge Dec 2, 2024
3a86617
unit test
guo-shaoge Dec 2, 2024
623fef5
prefetch
guo-shaoge Dec 2, 2024
19f320d
fix
guo-shaoge Dec 2, 2024
3a226df
refine
guo-shaoge Dec 3, 2024
c44ace7
refine
guo-shaoge Dec 3, 2024
3e30f95
revert new hasher
guo-shaoge Dec 3, 2024
ea85d19
debug low distinct value
guo-shaoge Dec 3, 2024
16937ff
Revert "revert new hasher"
guo-shaoge Dec 3, 2024
d2fba57
refine original code path
guo-shaoge Dec 3, 2024
71b6ecd
Reapply "revert new hasher"
guo-shaoge Dec 3, 2024
40ceb08
one level old hash; two level new hash
guo-shaoge Dec 3, 2024
4cb24c2
Revert "one level old hash; two level new hash"
guo-shaoge Dec 4, 2024
c02cf71
revert new hasher; refine original code path
guo-shaoge Dec 4, 2024
352b710
fix case
guo-shaoge Dec 5, 2024
e5ab87c
refine
guo-shaoge Dec 13, 2024
30d99be
is_serialized_key
guo-shaoge Dec 15, 2024
01cae80
fix
guo-shaoge Dec 15, 2024
95c597d
fix start_row
guo-shaoge Dec 15, 2024
83fb879
Merge branch 'master' into hashagg_prefetch
guo-shaoge Dec 16, 2024
e0dea79
revert prefetch for StringHashTable
guo-shaoge Dec 17, 2024
143fee3
comment
guo-shaoge Dec 17, 2024
6cea464
fmt
guo-shaoge Dec 18, 2024
6750737
comment
guo-shaoge Dec 19, 2024
2e94829
prefetch agg data
guo-shaoge Dec 20, 2024
dc0973a
try fix ci
guo-shaoge Dec 22, 2024
9623368
prefetch threshold(2MB); mini batch; prefetch exists key; prefetch in…
guo-shaoge Dec 24, 2024
0ef6ba6
clean code
guo-shaoge Dec 24, 2024
49d4188
fix prefetch agg func is 0
guo-shaoge Dec 24, 2024
b4a1bde
fix case
guo-shaoge Dec 24, 2024
738ab2a
refine agg func is zero
guo-shaoge Dec 25, 2024
361f988
refine
guo-shaoge Dec 25, 2024
5158885
refine comment
guo-shaoge Dec 25, 2024
80b7e22
getHash full
guo-shaoge Dec 26, 2024
c41dba4
minor refine
guo-shaoge Dec 26, 2024
ab31beb
hashvals sizeof 16
guo-shaoge Dec 26, 2024
00cc20d
fix case
guo-shaoge Dec 26, 2024
4aec90b
remove macro
guo-shaoge Dec 27, 2024
606e013
remove macro 2
guo-shaoge Dec 27, 2024
a7361e1
fix
guo-shaoge Dec 27, 2024
bdb21d3
refine comment
guo-shaoge Dec 27, 2024
4153592
refine
guo-shaoge Dec 27, 2024
bbd5c45
vec 16
guo-shaoge Dec 29, 2024
6627558
refine
guo-shaoge Jan 7, 2025
530bb27
compute hashvals before each mini batch
guo-shaoge Jan 7, 2025
358e851
handleMiniBatchImpl
guo-shaoge Jan 7, 2025
3d79a20
fmt
guo-shaoge Jan 7, 2025
51a90b5
remove useless code
guo-shaoge Jan 7, 2025
e108584
minor refine
guo-shaoge Jan 7, 2025
761b34d
fmt
guo-shaoge Jan 7, 2025
cfdb189
batch serialize
guo-shaoge Jan 8, 2025
9e19977
key_serialized
guo-shaoge Jan 8, 2025
09c8c3d
fix
guo-shaoge Jan 8, 2025
9a2116d
ArenaKeyHolder instead of SerializedKeyHolder
guo-shaoge Jan 9, 2025
81d76a8
ColumnString str_size use int32
guo-shaoge Jan 9, 2025
65cbc43
very large mini batch
guo-shaoge Jan 9, 2025
997396a
refine
guo-shaoge Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbms/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,18 @@ class AggregateFunctionGroupUniqArrayGeneric
{
// We have to copy the keys to our arena.
assert(arena != nullptr);
cur_set.emplace(ArenaKeyHolder{rhs_elem.getValue(), *arena}, it, inserted);
cur_set.emplace(ArenaKeyHolder{rhs_elem.getValue(), arena}, it, inserted);
}
}

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
auto & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();

auto & set = this->data(place).value;
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + set.size());
offsets_to.push_back((offsets_to.empty() ? 0 : offsets_to.back()) + set.size());

for (auto & elem : set)
deserializeAndInsert<is_plain_column>(elem.getValue(), data_to);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/AggregateFunctions/KeyHolderHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ inline auto getKeyHolder(const IColumn & column, size_t row_num, Arena & arena)
{
if constexpr (is_plain_column)
{
return ArenaKeyHolder{column.getDataAt(row_num), arena};
return ArenaKeyHolder{column.getDataAt(row_num), &arena};
}
else
{
const char * begin = nullptr;
StringRef serialized = column.serializeValueIntoArena(row_num, arena, begin);
assert(serialized.data != nullptr);
return SerializedKeyHolder{serialized, arena};
return SerializedKeyHolder{serialized, &arena};
}
}

Expand Down
64 changes: 62 additions & 2 deletions dbms/src/Columns/ColumnAggregateFunction.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,27 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega

const char * deserializeAndInsertFromArena(const char * src_arena, const TiDB::TiDBCollatorPtr &) override;

void countSerializeByteSizeUnique(
PaddedPODArray<size_t> & /* byte_size */,
const TiDB::TiDBCollatorPtr & /* collator */) const override
{
throw Exception(
"Method countSerializeByteSizeUnique is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void countSerializeByteSize(PaddedPODArray<size_t> & /* byte_size */) const override
{
throw Exception("Method countSerializeByteSize is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void countSerializeByteSizeUniqueForColumnArray(
PaddedPODArray<size_t> & /* byte_size */,
const IColumn::Offsets & /* offsets */,
const TiDB::TiDBCollatorPtr & /* collator */) const override
{
throw Exception(
"Method countSerializeByteSizeUniqueForColumnArray is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void countSerializeByteSizeForColumnArray(
PaddedPODArray<size_t> & /* byte_size */,
const IColumn::Offsets & /* offsets */) const override
Expand All @@ -178,6 +195,16 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
ErrorCodes::NOT_IMPLEMENTED);
}

void serializeToPosUnique(
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
{
throw Exception("Method serializeToPosUnique is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void serializeToPos(
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
Expand All @@ -186,6 +213,19 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
{
throw Exception("Method serializeToPos is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
void serializeToPosUniqueForColumnArray(
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const IColumn::Offsets & /* offsets */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
{
throw Exception(
"Method serializeToPosUniqueForColumnArray is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void serializeToPosForColumnArray(
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
Expand All @@ -198,14 +238,34 @@ class ColumnAggregateFunction final : public COWPtrHelper<IColumn, ColumnAggrega
ErrorCodes::NOT_IMPLEMENTED);
}

void deserializeAndInsertFromPos(PaddedPODArray<char *> & /* pos */, bool /* use_nt_align_buffer */) override
void deserializeAndInsertFromPosUnique(
PaddedPODArray<const char *> & /* pos */,
bool /* use_nt_align_buffer */,
const TiDB::TiDBCollatorPtr & /* collator */) override
{
throw Exception(
"Method deserializeAndInsertFromPosUnique is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void deserializeAndInsertFromPos(PaddedPODArray<const char *> & /* pos */, bool /* use_nt_align_buffer */) override
{
throw Exception(
"Method deserializeAndInsertFromPos is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}

void deserializeAndInsertFromPosUniqueForColumnArray(
PaddedPODArray<const char *> & /* pos */,
const IColumn::Offsets & /* array_offsets */,
bool /* use_nt_align_buffer */,
const TiDB::TiDBCollatorPtr & /* collator */) override
{
throw Exception(
"Method deserializeAndInsertFromPosUniqueForColumnArray is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void deserializeAndInsertFromPosForColumnArray(
PaddedPODArray<char *> & /* pos */,
PaddedPODArray<const char *> & /* pos */,
const IColumn::Offsets & /* array_offsets */,
bool /* use_nt_align_buffer */) override
{
Expand Down
82 changes: 74 additions & 8 deletions dbms/src/Columns/ColumnArray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,21 @@ const char * ColumnArray::deserializeAndInsertFromArena(const char * pos, const
return pos;
}

void ColumnArray::countSerializeByteSizeUnique(
PaddedPODArray<size_t> & byte_size,
const TiDB::TiDBCollatorPtr & collator) const
{
countSerializeByteSizeImpl<true>(byte_size, collator);
}

void ColumnArray::countSerializeByteSize(PaddedPODArray<size_t> & byte_size) const
{
countSerializeByteSizeImpl<false>(byte_size, nullptr);
}

template <bool ensure_unique>
void ColumnArray::countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size, const TiDB::TiDBCollatorPtr & collator)
const
{
RUNTIME_CHECK_MSG(byte_size.size() == size(), "size of byte_size({}) != column size({})", byte_size.size(), size());

Expand All @@ -237,19 +251,41 @@ void ColumnArray::countSerializeByteSize(PaddedPODArray<size_t> & byte_size) con
for (size_t i = 0; i < size; ++i)
byte_size[i] += sizeof(UInt32);

getData().countSerializeByteSizeForColumnArray(byte_size, getOffsets());
if constexpr (ensure_unique)
getData().countSerializeByteSizeUniqueForColumnArray(byte_size, getOffsets(), collator);
else
getData().countSerializeByteSizeForColumnArray(byte_size, getOffsets());
}

void ColumnArray::serializeToPosUnique(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
bool has_null,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const
{
if (has_null)
serializeToPosImpl<true, true>(pos, start, length, collator, sort_key_container);
else
serializeToPosImpl<false, true>(pos, start, length, collator, sort_key_container);
}

void ColumnArray::serializeToPos(PaddedPODArray<char *> & pos, size_t start, size_t length, bool has_null) const
{
if (has_null)
serializeToPosImpl<true>(pos, start, length);
serializeToPosImpl<true, false>(pos, start, length, nullptr, nullptr);
else
serializeToPosImpl<false>(pos, start, length);
serializeToPosImpl<false, false>(pos, start, length, nullptr, nullptr);
}

template <bool has_null>
void ColumnArray::serializeToPosImpl(PaddedPODArray<char *> & pos, size_t start, size_t length) const
template <bool has_null, bool ensure_unique>
void ColumnArray::serializeToPosImpl(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const
{
RUNTIME_CHECK_MSG(length <= pos.size(), "length({}) > size of pos({})", length, pos.size());
RUNTIME_CHECK_MSG(start + length <= size(), "start({}) + length({}) > size of column({})", start, length, size());
Expand All @@ -267,10 +303,37 @@ void ColumnArray::serializeToPosImpl(PaddedPODArray<char *> & pos, size_t start,
pos[i] += sizeof(UInt32);
}

getData().serializeToPosForColumnArray(pos, start, length, has_null, getOffsets());
if constexpr (ensure_unique)
getData().serializeToPosUniqueForColumnArray(
pos,
start,
length,
has_null,
getOffsets(),
collator,
sort_key_container);
else
getData().serializeToPosForColumnArray(pos, start, length, has_null, getOffsets());
}

void ColumnArray::deserializeAndInsertFromPos(PaddedPODArray<char *> & pos, bool use_nt_align_buffer)
void ColumnArray::deserializeAndInsertFromPosUnique(
PaddedPODArray<const char *> & pos,
bool use_nt_align_buffer,
const TiDB::TiDBCollatorPtr & collator)
{
deserializeAndInsertFromPosImpl<true>(pos, use_nt_align_buffer, collator);
}

void ColumnArray::deserializeAndInsertFromPos(PaddedPODArray<const char *> & pos, bool use_nt_align_buffer)
{
deserializeAndInsertFromPosImpl<false>(pos, use_nt_align_buffer, nullptr);
}

template <bool ensure_unique>
void ColumnArray::deserializeAndInsertFromPosImpl(
PaddedPODArray<const char *> & pos,
bool use_nt_align_buffer,
const TiDB::TiDBCollatorPtr & collator)
{
auto & offsets = getOffsets();
size_t prev_size = offsets.size();
Expand All @@ -285,7 +348,10 @@ void ColumnArray::deserializeAndInsertFromPos(PaddedPODArray<char *> & pos, bool
pos[i] += sizeof(UInt32);
}

getData().deserializeAndInsertFromPosForColumnArray(pos, offsets, use_nt_align_buffer);
if constexpr (ensure_unique)
getData().deserializeAndInsertFromPosUniqueForColumnArray(pos, offsets, use_nt_align_buffer, collator);
else
getData().deserializeAndInsertFromPosForColumnArray(pos, offsets, use_nt_align_buffer);
}

void ColumnArray::flushNTAlignBuffer()
Expand Down
70 changes: 66 additions & 4 deletions dbms/src/Columns/ColumnArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>

ColumnArray(const ColumnArray &) = default;

template <bool ensure_unique>
void countSerializeByteSizeImpl(PaddedPODArray<size_t> & byte_size, const TiDB::TiDBCollatorPtr & collator) const;

template <bool has_null, bool ensure_unique>
void serializeToPosImpl(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const;

template <bool ensure_unique>
void deserializeAndInsertFromPosImpl(
PaddedPODArray<const char *> & pos,
bool use_nt_align_buffer,
const TiDB::TiDBCollatorPtr & collator);

public:
/** Create immutable column using immutable arguments. This arguments may be shared with other columns.
* Use IColumn::mutate in order to make mutable column and mutate shared nested columns.
Expand Down Expand Up @@ -82,7 +99,19 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
String &) const override;
const char * deserializeAndInsertFromArena(const char * pos, const TiDB::TiDBCollatorPtr &) override;

void countSerializeByteSizeUnique(PaddedPODArray<size_t> & byte_size, const TiDB::TiDBCollatorPtr & collator)
const override;
void countSerializeByteSize(PaddedPODArray<size_t> & byte_size) const override;

void countSerializeByteSizeUniqueForColumnArray(
PaddedPODArray<size_t> & /* byte_size */,
const IColumn::Offsets & /* array_offsets */,
const TiDB::TiDBCollatorPtr & /* collator */) const override
{
throw Exception(
"Method countSerializeByteSizeUniqueForColumnArray is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void countSerializeByteSizeForColumnArray(
PaddedPODArray<size_t> & /* byte_size */,
const IColumn::Offsets & /* array_offsets */) const override
Expand All @@ -92,10 +121,28 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
ErrorCodes::NOT_IMPLEMENTED);
}

void serializeToPosUnique(
PaddedPODArray<char *> & pos,
size_t start,
size_t length,
bool has_null,
const TiDB::TiDBCollatorPtr & collator,
String * sort_key_container) const override;
void serializeToPos(PaddedPODArray<char *> & pos, size_t start, size_t length, bool has_null) const override;
template <bool has_null>
void serializeToPosImpl(PaddedPODArray<char *> & pos, size_t start, size_t length) const;

void serializeToPosUniqueForColumnArray(
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
size_t /* length */,
bool /* has_null */,
const IColumn::Offsets & /* array_offsets */,
const TiDB::TiDBCollatorPtr & /* collator */,
String * /* sort_key_container */) const override
{
throw Exception(
"Method serializeToPosUniqueForColumnArray is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void serializeToPosForColumnArray(
PaddedPODArray<char *> & /* pos */,
size_t /* start */,
Expand All @@ -108,9 +155,24 @@ class ColumnArray final : public COWPtrHelper<IColumn, ColumnArray>
ErrorCodes::NOT_IMPLEMENTED);
}

void deserializeAndInsertFromPos(PaddedPODArray<char *> & pos, bool use_nt_align_buffer) override;
void deserializeAndInsertFromPosUnique(
PaddedPODArray<const char *> & pos,
bool use_nt_align_buffer,
const TiDB::TiDBCollatorPtr & collator) override;
void deserializeAndInsertFromPos(PaddedPODArray<const char *> & pos, bool use_nt_align_buffer) override;

void deserializeAndInsertFromPosUniqueForColumnArray(
PaddedPODArray<const char *> & /* pos */,
const IColumn::Offsets & /* array_offsets */,
bool /* use_nt_align_buffer */,
const TiDB::TiDBCollatorPtr & /* collator */) override
{
throw Exception(
"Method deserializeAndInsertFromPosUniqueForColumnArray is not supported for " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
void deserializeAndInsertFromPosForColumnArray(
PaddedPODArray<char *> & /* pos */,
PaddedPODArray<const char *> & /* pos */,
const IColumn::Offsets & /* array_offsets */,
bool /* use_nt_align_buffer */) override
{
Expand Down
Loading