diff --git a/cpp/mcap/include/mcap/writer.hpp b/cpp/mcap/include/mcap/writer.hpp index c5242194e..509646295 100644 --- a/cpp/mcap/include/mcap/writer.hpp +++ b/cpp/mcap/include/mcap/writer.hpp @@ -56,10 +56,12 @@ struct MCAP_PUBLIC McapWriterOptions { bool noSummary = false; /** * @brief Target uncompressed Chunk payload size in bytes. Once a Chunk's - * uncompressed data meets or exceeds this size, the Chunk will be compressed - * (if compression is enabled) and written to disk. Note that smaller Chunks - * may be written, such as the last Chunk in the Data section. This option is - * ignored if `noChunking=true`. + * uncompressed data is about to exceed this size, the Chunk will be + * compressed (if enabled) and written to disk. Note that this is a 'soft' + * ceiling as some Chunks could exceed this size due to either indexing + * data or when a single message is larger than `chunkSize`, in which case, + * the Chunk will contain only this one large message. + * This option is ignored if `noChunking=true`. */ uint64_t chunkSize = DefaultChunkSize; /** @@ -432,6 +434,7 @@ class MCAP_PUBLIC McapWriter final { static uint64_t write(IWritable& output, const Footer& footer, bool crcEnabled); static uint64_t write(IWritable& output, const Schema& schema); static uint64_t write(IWritable& output, const Channel& channel); + static uint64_t getRecordSize(const Message& message); static uint64_t write(IWritable& output, const Message& message); static uint64_t write(IWritable& output, const Attachment& attachment); static uint64_t write(IWritable& output, const Metadata& metadata); diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index dd9b82cb9..15f2fcf20 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -547,6 +547,16 @@ Status McapWriter::write(const Message& message) { ++statistics_.channelCount; } + // Before writing a message that would overflow the current chunk, close it. + auto* chunkWriter = getChunkWriter(); + if (chunkWriter != nullptr && /* Chunked? */ + uncompressedSize_ != 0 && /* Current chunk is not empty/new? */ + 9 + getRecordSize(message) + uncompressedSize_ >= chunkSize_ /* Overflowing? */) { + auto& fileOutput = *output_; + writeChunk(fileOutput, *chunkWriter); + } + + // For the chunk-local message index. const uint64_t messageOffset = uncompressedSize_; // Write the message @@ -565,8 +575,7 @@ Status McapWriter::write(const Message& message) { channelMessageCounts[message.channelId] += 1; } - auto* chunkWriter = getChunkWriter(); - if (chunkWriter) { + if (chunkWriter != nullptr) { if (!options_.noMessageIndex) { // Update the message index auto& messageIndex = currentMessageIndex_[message.channelId]; @@ -875,8 +884,12 @@ uint64_t McapWriter::write(IWritable& output, const Channel& channel) { return 9 + recordSize; } +uint64_t McapWriter::getRecordSize(const Message& message) { + return 2 + 4 + 8 + 8 + message.dataSize; +} + uint64_t McapWriter::write(IWritable& output, const Message& message) { - const uint64_t recordSize = 2 + 4 + 8 + 8 + message.dataSize; + const uint64_t recordSize = getRecordSize(message); write(output, OpCode::Message); write(output, recordSize); diff --git a/cpp/test/unit_tests.cpp b/cpp/test/unit_tests.cpp index 99759018d..d5e2603fa 100644 --- a/cpp/test/unit_tests.cpp +++ b/cpp/test/unit_tests.cpp @@ -530,7 +530,8 @@ TEST_CASE("McapReader::readMessages()", "[reader]") { /** * @brief ensures that message index records are only written for the channels present in the - * previous chunk. This test writes two chunks with one message each in separate channels. + * previous chunk. This test writes two chunks with one message each in separate channels, with + * the second message being large enough to guarantee the current chunk will be written out. * If the writer is working correctly, there will be one message index record after each chunk, * one for each message. */ @@ -539,7 +540,7 @@ TEST_CASE("Message index records", "[writer]") { mcap::McapWriter writer; mcap::McapWriterOptions opts("test"); - opts.chunkSize = 100; + opts.chunkSize = 200; opts.compression = mcap::Compression::None; writer.open(buffer, opts); @@ -552,9 +553,10 @@ TEST_CASE("Message index records", "[writer]") { writer.addChannel(channel2); mcap::Message msg; - std::vector data(150); - WriteMsg(writer, channel1.id, 0, 100, 100, data); - WriteMsg(writer, channel2.id, 0, 200, 200, data); + // First message should not fill first chunk. + WriteMsg(writer, channel1.id, 0, 100, 100, std::vector{20}); + // Second message fills current chunk and triggers a new one. + WriteMsg(writer, channel2.id, 0, 200, 200, std::vector{400}); writer.close();