Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preemptively close chunks before big messages to isolate them #1291

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cpp/mcap/include/mcap/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ struct MCAP_PUBLIC McapWriterOptions {
bool noSummary = false;
/**
* @brief Target uncompressed Chunk payload size in bytes. Once a Chunk's
* uncompressed data meets or exceeds this size, the Chunk will be compressed
* (if compression is enabled) and written to disk. Note that smaller Chunks
* may be written, such as the last Chunk in the Data section. This option is
* ignored if `noChunking=true`.
* uncompressed data is about to exceed this size, the Chunk will be
* compressed (if enabled) and written to disk. Note that this is a 'soft'
* ceiling as some Chunks could exceed this size due to either indexing
* data or when a single message is larger than `chunkSize`, in which case,
* the Chunk will contain only this one large message.
* This option is ignored if `noChunking=true`.
*/
uint64_t chunkSize = DefaultChunkSize;
/**
Expand Down Expand Up @@ -432,6 +434,7 @@ class MCAP_PUBLIC McapWriter final {
static uint64_t write(IWritable& output, const Footer& footer, bool crcEnabled);
static uint64_t write(IWritable& output, const Schema& schema);
static uint64_t write(IWritable& output, const Channel& channel);
static uint64_t getRecordSize(const Message& message);
static uint64_t write(IWritable& output, const Message& message);
static uint64_t write(IWritable& output, const Attachment& attachment);
static uint64_t write(IWritable& output, const Metadata& metadata);
Expand Down
19 changes: 16 additions & 3 deletions cpp/mcap/include/mcap/writer.inl
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,16 @@ Status McapWriter::write(const Message& message) {
++statistics_.channelCount;
}

// Before writing a message that would overflow the current chunk, close it.
auto* chunkWriter = getChunkWriter();
if (chunkWriter != nullptr && /* Chunked? */
uncompressedSize_ != 0 && /* Current chunk is not empty/new? */
9 + getRecordSize(message) + uncompressedSize_ >= chunkSize_ /* Overflowing? */) {
auto& fileOutput = *output_;
writeChunk(fileOutput, *chunkWriter);
}

// For the chunk-local message index.
const uint64_t messageOffset = uncompressedSize_;

// Write the message
Expand All @@ -565,8 +575,7 @@ Status McapWriter::write(const Message& message) {
channelMessageCounts[message.channelId] += 1;
}

auto* chunkWriter = getChunkWriter();
if (chunkWriter) {
if (chunkWriter != nullptr) {
if (!options_.noMessageIndex) {
// Update the message index
auto& messageIndex = currentMessageIndex_[message.channelId];
Expand Down Expand Up @@ -875,8 +884,12 @@ uint64_t McapWriter::write(IWritable& output, const Channel& channel) {
return 9 + recordSize;
}

uint64_t McapWriter::getRecordSize(const Message& message) {
return 2 + 4 + 8 + 8 + message.dataSize;
}

uint64_t McapWriter::write(IWritable& output, const Message& message) {
const uint64_t recordSize = 2 + 4 + 8 + 8 + message.dataSize;
const uint64_t recordSize = getRecordSize(message);

write(output, OpCode::Message);
write(output, recordSize);
Expand Down
12 changes: 7 additions & 5 deletions cpp/test/unit_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,8 @@ TEST_CASE("McapReader::readMessages()", "[reader]") {

/**
* @brief ensures that message index records are only written for the channels present in the
* previous chunk. This test writes two chunks with one message each in separate channels.
* previous chunk. This test writes two chunks with one message each in separate channels, with
* the second message being large enough to guarantee the current chunk will be written out.
* If the writer is working correctly, there will be one message index record after each chunk,
* one for each message.
*/
Expand All @@ -539,7 +540,7 @@ TEST_CASE("Message index records", "[writer]") {

mcap::McapWriter writer;
mcap::McapWriterOptions opts("test");
opts.chunkSize = 100;
opts.chunkSize = 200;
opts.compression = mcap::Compression::None;

writer.open(buffer, opts);
Expand All @@ -552,9 +553,10 @@ TEST_CASE("Message index records", "[writer]") {
writer.addChannel(channel2);

mcap::Message msg;
std::vector<std::byte> data(150);
WriteMsg(writer, channel1.id, 0, 100, 100, data);
WriteMsg(writer, channel2.id, 0, 200, 200, data);
// First message should not fill first chunk.
WriteMsg(writer, channel1.id, 0, 100, 100, std::vector<std::byte>{20});
// Second message fills current chunk and triggers a new one.
WriteMsg(writer, channel2.id, 0, 200, 200, std::vector<std::byte>{400});

writer.close();

Expand Down
Loading