From 9c256c8675ebcc2f3b22c88f938a1500696daf5c Mon Sep 17 00:00:00 2001 From: Mikael Persson Date: Thu, 12 Dec 2024 15:40:53 -0500 Subject: [PATCH 1/5] Preemptively close chunks before big messages to isolate them --- cpp/mcap/include/mcap/writer.inl | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index dd9b82cb9..ffa348631 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -547,6 +547,16 @@ Status McapWriter::write(const Message& message) { ++statistics_.channelCount; } + // Before writing a large message (bigger than chunk size), close current chunk. + auto* chunkWriter = getChunkWriter(); + if (chunkWriter != nullptr && /* Chunked? */ + uncompressedSize_ != 0 && /* Current chunk is not empty/new? */ + message.dataSize >= chunkSize_ /* Big message? */ ) { + auto& fileOutput = *output_; + writeChunk(fileOutput, *chunkWriter); + } + + // For the chunk-local message index. const uint64_t messageOffset = uncompressedSize_; // Write the message @@ -565,8 +575,7 @@ Status McapWriter::write(const Message& message) { channelMessageCounts[message.channelId] += 1; } - auto* chunkWriter = getChunkWriter(); - if (chunkWriter) { + if (chunkWriter != nullptr) { if (!options_.noMessageIndex) { // Update the message index auto& messageIndex = currentMessageIndex_[message.channelId]; From 92953a3ccaffc651788d0d43c9bfe9d63c1be4c0 Mon Sep 17 00:00:00 2001 From: Mikael Persson Date: Thu, 12 Dec 2024 20:37:01 -0500 Subject: [PATCH 2/5] Fixed unit test, added a unit test --- cpp/test/unit_tests.cpp | 57 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/cpp/test/unit_tests.cpp b/cpp/test/unit_tests.cpp index 99759018d..e6b6e8a70 100644 --- a/cpp/test/unit_tests.cpp +++ b/cpp/test/unit_tests.cpp @@ -552,7 +552,7 @@ TEST_CASE("Message index records", "[writer]") { writer.addChannel(channel2); mcap::Message msg; - std::vector data(150); + std::vector data(90); WriteMsg(writer, channel1.id, 0, 100, 100, data); WriteMsg(writer, channel2.id, 0, 200, 200, data); @@ -584,6 +584,61 @@ TEST_CASE("Message index records", "[writer]") { REQUIRE(messageIndexChannelIds[1] == channel2.id); } +/** + * @brief ensures that messages bigger than the chunk size trigger a new chunk to be written. + * This test writes two chunks with one message each in separate channels. + * If the writer is working correctly, there will be one message index record after each chunk, + * one for each message. + */ +TEST_CASE("Large message isolation", "[writer]") { + Buffer buffer; + + mcap::McapWriter writer; + mcap::McapWriterOptions opts("test"); + opts.chunkSize = 200; + opts.compression = mcap::Compression::None; + + writer.open(buffer, opts); + + mcap::Schema schema("schema", "schemaEncoding", "ab"); + writer.addSchema(schema); + mcap::Channel channel1("topic", "messageEncoding", schema.id); + writer.addChannel(channel1); + mcap::Channel channel2("topic", "messageEncoding", schema.id); + writer.addChannel(channel2); + + mcap::Message msg; + WriteMsg(writer, channel1.id, 0, 100, 100, std::vector{20}); + WriteMsg(writer, channel2.id, 0, 200, 200, std::vector{400}); + + writer.close(); + + // read the records after the starting magic, stopping before the end magic. + mcap::RecordReader reader(buffer, sizeof(mcap::Magic), buffer.size() - sizeof(mcap::Magic)); + + std::vector messageIndexChannelIds; + uint32_t chunkCount = 0; + + for (std::optional rec = reader.next(); rec != std::nullopt; rec = reader.next()) { + requireOk(reader.status()); + if (rec->opcode == mcap::OpCode::MessageIndex) { + mcap::MessageIndex index; + requireOk(mcap::McapReader::ParseMessageIndex(*rec, &index)); + REQUIRE(index.records.size() == 1); + messageIndexChannelIds.push_back(index.channelId); + } + if (rec->opcode == mcap::OpCode::Chunk) { + chunkCount++; + } + } + requireOk(reader.status()); + + REQUIRE(chunkCount == 2); + REQUIRE(messageIndexChannelIds.size() == 2); + REQUIRE(messageIndexChannelIds[0] == channel1.id); + REQUIRE(messageIndexChannelIds[1] == channel2.id); +} + #ifndef MCAP_COMPRESSION_NO_LZ4 TEST_CASE("LZ4 compression", "[reader][writer]") { SECTION("Roundtrip") { From cf5737be36c5df5807d74101cd031cc3be4fbfe0 Mon Sep 17 00:00:00 2001 From: Mikael Persson Date: Mon, 16 Dec 2024 21:02:55 -0500 Subject: [PATCH 3/5] Added option and doc updates to reflect this new behavior --- cpp/mcap/include/mcap/writer.hpp | 13 +++++++++++-- cpp/mcap/include/mcap/writer.inl | 3 ++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/cpp/mcap/include/mcap/writer.hpp b/cpp/mcap/include/mcap/writer.hpp index c5242194e..bcef0721c 100644 --- a/cpp/mcap/include/mcap/writer.hpp +++ b/cpp/mcap/include/mcap/writer.hpp @@ -58,10 +58,19 @@ struct MCAP_PUBLIC McapWriterOptions { * @brief Target uncompressed Chunk payload size in bytes. Once a Chunk's * uncompressed data meets or exceeds this size, the Chunk will be compressed * (if compression is enabled) and written to disk. Note that smaller Chunks - * may be written, such as the last Chunk in the Data section. This option is - * ignored if `noChunking=true`. + * may be written, such as the last Chunk in the Data section or a chunk + * preceding a chunk containing a huge message (see `noHugeMessageChunk`). + * This option is ignored if `noChunking=true`. */ uint64_t chunkSize = DefaultChunkSize; + /** + * @brief Do not isolate huge messages into their own chunks. Huge messages + * are those bigger than `chunkSize`, and could thus fill a whole chunk. + * In most cases, it's preferable to write these messages on their own + * chunks for better reading performance, but it will result in more + * under-sized chunks. + */ + bool noHugeMessageChunk = false; /** * @brief Compression algorithm to use when writing Chunks. This option is * ignored if `noChunking=true`. diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index ffa348631..ac4f3b19b 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -549,7 +549,8 @@ Status McapWriter::write(const Message& message) { // Before writing a large message (bigger than chunk size), close current chunk. auto* chunkWriter = getChunkWriter(); - if (chunkWriter != nullptr && /* Chunked? */ + if (!options_.noHugeMessageChunk && /* Not disabled by user? */ + chunkWriter != nullptr && /* Chunked? */ uncompressedSize_ != 0 && /* Current chunk is not empty/new? */ message.dataSize >= chunkSize_ /* Big message? */ ) { auto& fileOutput = *output_; From 98ef8ef84a9404d0dfbe87e594cc71424269286b Mon Sep 17 00:00:00 2001 From: Mikael Persson Date: Wed, 18 Dec 2024 12:10:59 -0500 Subject: [PATCH 4/5] Updated to @jtbandes suggested behavior of making chunkSize a soft ceiling, rather than soft floor --- cpp/mcap/include/mcap/writer.hpp | 17 +++------ cpp/mcap/include/mcap/writer.inl | 7 ++-- cpp/test/unit_tests.cpp | 61 +++----------------------------- 3 files changed, 12 insertions(+), 73 deletions(-) diff --git a/cpp/mcap/include/mcap/writer.hpp b/cpp/mcap/include/mcap/writer.hpp index bcef0721c..e337094c8 100644 --- a/cpp/mcap/include/mcap/writer.hpp +++ b/cpp/mcap/include/mcap/writer.hpp @@ -56,21 +56,14 @@ struct MCAP_PUBLIC McapWriterOptions { bool noSummary = false; /** * @brief Target uncompressed Chunk payload size in bytes. Once a Chunk's - * uncompressed data meets or exceeds this size, the Chunk will be compressed - * (if compression is enabled) and written to disk. Note that smaller Chunks - * may be written, such as the last Chunk in the Data section or a chunk - * preceding a chunk containing a huge message (see `noHugeMessageChunk`). + * uncompressed data is about to exceed this size, the Chunk will be + * compressed (if enabled) and written to disk. Note that this is a 'soft' + * ceiling as some Chunks could exceed this size due to either indexing + * data or when a single message is larger than `chunkSize`, in which case, + * the Chunk will contain only this one large message. * This option is ignored if `noChunking=true`. */ uint64_t chunkSize = DefaultChunkSize; - /** - * @brief Do not isolate huge messages into their own chunks. Huge messages - * are those bigger than `chunkSize`, and could thus fill a whole chunk. - * In most cases, it's preferable to write these messages on their own - * chunks for better reading performance, but it will result in more - * under-sized chunks. - */ - bool noHugeMessageChunk = false; /** * @brief Compression algorithm to use when writing Chunks. This option is * ignored if `noChunking=true`. diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index ac4f3b19b..49edf47d4 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -547,12 +547,11 @@ Status McapWriter::write(const Message& message) { ++statistics_.channelCount; } - // Before writing a large message (bigger than chunk size), close current chunk. + // Before writing a message that would overflow the current chunk, close it. auto* chunkWriter = getChunkWriter(); - if (!options_.noHugeMessageChunk && /* Not disabled by user? */ - chunkWriter != nullptr && /* Chunked? */ + if (chunkWriter != nullptr && /* Chunked? */ uncompressedSize_ != 0 && /* Current chunk is not empty/new? */ - message.dataSize >= chunkSize_ /* Big message? */ ) { + message.dataSize + uncompressedSize_ >= chunkSize_ /* Overflowing? */) { auto& fileOutput = *output_; writeChunk(fileOutput, *chunkWriter); } diff --git a/cpp/test/unit_tests.cpp b/cpp/test/unit_tests.cpp index e6b6e8a70..d5e2603fa 100644 --- a/cpp/test/unit_tests.cpp +++ b/cpp/test/unit_tests.cpp @@ -530,69 +530,14 @@ TEST_CASE("McapReader::readMessages()", "[reader]") { /** * @brief ensures that message index records are only written for the channels present in the - * previous chunk. This test writes two chunks with one message each in separate channels. + * previous chunk. This test writes two chunks with one message each in separate channels, with + * the second message being large enough to guarantee the current chunk will be written out. * If the writer is working correctly, there will be one message index record after each chunk, * one for each message. */ TEST_CASE("Message index records", "[writer]") { Buffer buffer; - mcap::McapWriter writer; - mcap::McapWriterOptions opts("test"); - opts.chunkSize = 100; - opts.compression = mcap::Compression::None; - - writer.open(buffer, opts); - - mcap::Schema schema("schema", "schemaEncoding", "ab"); - writer.addSchema(schema); - mcap::Channel channel1("topic", "messageEncoding", schema.id); - writer.addChannel(channel1); - mcap::Channel channel2("topic", "messageEncoding", schema.id); - writer.addChannel(channel2); - - mcap::Message msg; - std::vector data(90); - WriteMsg(writer, channel1.id, 0, 100, 100, data); - WriteMsg(writer, channel2.id, 0, 200, 200, data); - - writer.close(); - - // read the records after the starting magic, stopping before the end magic. - mcap::RecordReader reader(buffer, sizeof(mcap::Magic), buffer.size() - sizeof(mcap::Magic)); - - std::vector messageIndexChannelIds; - uint32_t chunkCount = 0; - - for (std::optional rec = reader.next(); rec != std::nullopt; rec = reader.next()) { - requireOk(reader.status()); - if (rec->opcode == mcap::OpCode::MessageIndex) { - mcap::MessageIndex index; - requireOk(mcap::McapReader::ParseMessageIndex(*rec, &index)); - REQUIRE(index.records.size() == 1); - messageIndexChannelIds.push_back(index.channelId); - } - if (rec->opcode == mcap::OpCode::Chunk) { - chunkCount++; - } - } - requireOk(reader.status()); - - REQUIRE(chunkCount == 2); - REQUIRE(messageIndexChannelIds.size() == 2); - REQUIRE(messageIndexChannelIds[0] == channel1.id); - REQUIRE(messageIndexChannelIds[1] == channel2.id); -} - -/** - * @brief ensures that messages bigger than the chunk size trigger a new chunk to be written. - * This test writes two chunks with one message each in separate channels. - * If the writer is working correctly, there will be one message index record after each chunk, - * one for each message. - */ -TEST_CASE("Large message isolation", "[writer]") { - Buffer buffer; - mcap::McapWriter writer; mcap::McapWriterOptions opts("test"); opts.chunkSize = 200; @@ -608,7 +553,9 @@ TEST_CASE("Large message isolation", "[writer]") { writer.addChannel(channel2); mcap::Message msg; + // First message should not fill first chunk. WriteMsg(writer, channel1.id, 0, 100, 100, std::vector{20}); + // Second message fills current chunk and triggers a new one. WriteMsg(writer, channel2.id, 0, 200, 200, std::vector{400}); writer.close(); From 5c9e998b06aace68715c91e5a06667e53aafbe47 Mon Sep 17 00:00:00 2001 From: Mikael Persson Date: Thu, 19 Dec 2024 13:06:56 -0500 Subject: [PATCH 5/5] Fixed message size to actual record size and preamble --- cpp/mcap/include/mcap/writer.hpp | 1 + cpp/mcap/include/mcap/writer.inl | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/cpp/mcap/include/mcap/writer.hpp b/cpp/mcap/include/mcap/writer.hpp index e337094c8..509646295 100644 --- a/cpp/mcap/include/mcap/writer.hpp +++ b/cpp/mcap/include/mcap/writer.hpp @@ -434,6 +434,7 @@ class MCAP_PUBLIC McapWriter final { static uint64_t write(IWritable& output, const Footer& footer, bool crcEnabled); static uint64_t write(IWritable& output, const Schema& schema); static uint64_t write(IWritable& output, const Channel& channel); + static uint64_t getRecordSize(const Message& message); static uint64_t write(IWritable& output, const Message& message); static uint64_t write(IWritable& output, const Attachment& attachment); static uint64_t write(IWritable& output, const Metadata& metadata); diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index 49edf47d4..15f2fcf20 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -551,7 +551,7 @@ Status McapWriter::write(const Message& message) { auto* chunkWriter = getChunkWriter(); if (chunkWriter != nullptr && /* Chunked? */ uncompressedSize_ != 0 && /* Current chunk is not empty/new? */ - message.dataSize + uncompressedSize_ >= chunkSize_ /* Overflowing? */) { + 9 + getRecordSize(message) + uncompressedSize_ >= chunkSize_ /* Overflowing? */) { auto& fileOutput = *output_; writeChunk(fileOutput, *chunkWriter); } @@ -884,8 +884,12 @@ uint64_t McapWriter::write(IWritable& output, const Channel& channel) { return 9 + recordSize; } +uint64_t McapWriter::getRecordSize(const Message& message) { + return 2 + 4 + 8 + 8 + message.dataSize; +} + uint64_t McapWriter::write(IWritable& output, const Message& message) { - const uint64_t recordSize = 2 + 4 + 8 + 8 + message.dataSize; + const uint64_t recordSize = getRecordSize(message); write(output, OpCode::Message); write(output, recordSize);