Skip to content

Commit

Permalink
feat: Optimize PrestoBatchVectorSerializer [5/7]: Serialize ConstantV…
Browse files Browse the repository at this point in the history
…ectors (#12076)

Summary:
Pull Request resolved: #12076

Context:
This is a series of diffs in which I reimplement PrestoBatchVectorSerializer to write directly to the output stream,
rather than the indirect route it currently uses via VectorStreams. Reusing VectorStreams and much of the code
for PrestoIterativeVectorSerializer prevented us from capturing all of the performance benefits of writing data in
batches rather than row by row. These changes combined will speed up PrestoBatchVectorSerializer 2-3x (as
measured in Presto queries and other use cases).

In the final diff I will integrate the new serialization functions into PrestoBatchVectorSerializer's serialize
function which will switch it to the new optimized writing path, therefore I will land these changes as a stack.

In this diff:
I provide the implementations for serializing ConstantVectors.

Differential Revision: D68114215
  • Loading branch information
Kevin Wilfong authored and facebook-github-bot committed Jan 13, 2025
1 parent 864eeb7 commit 6b06d0b
Show file tree
Hide file tree
Showing 2 changed files with 479 additions and 6 deletions.
204 changes: 204 additions & 0 deletions velox/serializers/PrestoBatchVectorSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,208 @@ void PrestoBatchVectorSerializer::serializeMapVector(
// Wirte out the hasNull and isNull flags.
writeNullsSegment(hasNulls, vector, ranges, numRows, stream);
}

/// Specialization for opaque types.
template <>
void PrestoBatchVectorSerializer::writeSingleValue(
const std::shared_ptr<void>& value,
const TypePtr& type,
BufferedOutputStream* stream,
bool withNull) {
// Write out the header.
writeHeader(type, stream);
// Write out the number of nulls.
writeInt32(stream, withNull ? 2 : 1);

const std::string serializedValue =
type->asOpaque().getSerializeFunc()(value);

// Write out the lengths.
if (withNull) {
writeInt32(stream, 0);
}
writeInt32(stream, serializedValue.size());

// Write out the hasNull and isNull flags.
if (withNull) {
stream->write(&kOne, 1);
stream->write(&kSingleNull, 1);
} else {
stream->write(&kZero, 1);
}

// Write out the total length of the values, i.e. the length of the only
// value.
writeInt32(stream, serializedValue.size());

// Write out the single non-null value.
stream->write(serializedValue.data(), serializedValue.size());
}

void PrestoBatchVectorSerializer::writeSingleNull(
const TypePtr& type,
BufferedOutputStream* stream) {
switch (type->kind()) {
case TypeKind::BOOLEAN:
case TypeKind::TINYINT:
case TypeKind::SMALLINT:
case TypeKind::INTEGER:
case TypeKind::BIGINT:
case TypeKind::REAL:
case TypeKind::DOUBLE:
case TypeKind::TIMESTAMP:
case TypeKind::HUGEINT:
case TypeKind::UNKNOWN:
// Write the header.
writeHeader(type, stream);
// Write the number of rows.
writeInt32(stream, 1);
// Write the hasNull flag.
stream->write(&kOne, 1);
// Write the isNull flags.
stream->write(&kSingleNull, 1);
break;
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
// Write the header.
writeHeader(type, stream);
// Write the number of rows.
writeInt32(stream, 1);
// Write the offsets of the (non-existent) values.
writeInt32(stream, 0);
// Write the hasNull flag.
stream->write(&kOne, 1);
// Write the isNull flags.
stream->write(&kSingleNull, 1);
// Write the total size of the (non-existent) values.
writeInt32(stream, 0);
break;
case TypeKind::ARRAY:
// Write the header.
writeHeader(type, stream);
// Write the non-existent elements.
writeEmptyVector(type->childAt(0), stream);
// Write the number of rows.
writeInt32(stream, 1);
// Write the offsets of the (non-existent) values.
writeInt32(stream, 0);
writeInt32(stream, 0);
// Write the hasNull flag.
stream->write(&kOne, 1);
// Write the isNull flags.
stream->write(&kSingleNull, 1);
break;
case TypeKind::MAP:
// Write the header.
writeHeader(type, stream);
// Write the non-existent keys.
writeEmptyVector(type->childAt(0), stream);
// Write the non-existent values.
writeEmptyVector(type->childAt(1), stream);
// Write the size of the hash map (which we don't use, so it's -1).
writeInt32(stream, -1);
// Write the number of rows.
writeInt32(stream, 1);
// Write the offsets of the (non-existent) values.
writeInt32(stream, 0);
writeInt32(stream, 0);
// Write the hasNull flag.
stream->write(&kOne, 1);
// Write the isNull flags.
stream->write(&kSingleNull, 1);
break;
case TypeKind::ROW:
// Write the header.
writeHeader(type, stream);
// Write the number of children.
writeInt32(stream, type->size());
// Write the non-existent children.
for (int i = 0; i < type->size(); ++i) {
writeEmptyVector(type->childAt(i), stream);
}
// Write the number of rows.
writeInt32(stream, 1);
// Write the offsets of the (non-existent) values.
writeInt32(stream, 0);
writeInt32(stream, 0);
// Write the hasNull flag.
stream->write(&kOne, 1);
// Write the isNull flags.
stream->write(&kSingleNull, 1);
break;
default:
VELOX_UNSUPPORTED();
}
}

void PrestoBatchVectorSerializer::writeEmptyVector(
const TypePtr& type,
BufferedOutputStream* stream) {
switch (type->kind()) {
case TypeKind::BOOLEAN:
case TypeKind::TINYINT:
case TypeKind::SMALLINT:
case TypeKind::INTEGER:
case TypeKind::BIGINT:
case TypeKind::REAL:
case TypeKind::DOUBLE:
case TypeKind::TIMESTAMP:
case TypeKind::HUGEINT:
case TypeKind::UNKNOWN:
// Write the header.
writeHeader(type, stream);
// Write the number of rows.
writeInt32(stream, 0);
break;
case TypeKind::VARCHAR:
case TypeKind::VARBINARY:
// Write the header.
writeHeader(type, stream);
// Write the number of rows.
writeInt32(stream, 0);
// Write the total size of the non-existent values.
writeInt32(stream, 0);
break;
case TypeKind::ARRAY:
// Write the header.
writeHeader(type, stream);
// Write the non-existent elements.
writeEmptyVector(type->childAt(0), stream);
// Write the number of rows.
writeInt32(stream, 0);
// Write the offsets of the non-existent values.
writeInt32(stream, 0);
break;
case TypeKind::MAP:
// Write the header.
writeHeader(type, stream);
// Write the non-existent keys.
writeEmptyVector(type->childAt(0), stream);
// Write the non-existent values.
writeEmptyVector(type->childAt(1), stream);
// Write the size of the hash map (which we don't use, so it's -1).
writeInt32(stream, -1);
// Write the number of rows.
writeInt32(stream, 0);
// Write the offsets of the non-existent values.
writeInt32(stream, 0);
break;
case TypeKind::ROW:
// Write the header.
writeHeader(type, stream);
// Write the number of children.
writeInt32(stream, type->size());
// Write the non-existent children.
for (int i = 0; i < type->size(); ++i) {
writeEmptyVector(type->childAt(i), stream);
}
// Write the number of rows.
writeInt32(stream, 0);
// Write the offsets of the non-existent values.
writeInt32(stream, 0);
break;
default:
VELOX_UNSUPPORTED();
}
}
} // namespace facebook::velox::serializer::presto::detail
Loading

0 comments on commit 6b06d0b

Please sign in to comment.