From d42ed2491e574e9e86fb9d6dce5d67784b5e26a8 Mon Sep 17 00:00:00 2001 From: Bennett Hardwick Date: Fri, 6 Dec 2024 13:13:07 +1000 Subject: [PATCH] rust: write attachments in parts (#1285) ### Changelog rust: support writing attachments in multiple chunks ### Docs None ### Description This change supports writing large attachments to MCAP files without the need to load them into memory. Currently, in order to write an attachment, you need to load the entire file into memory, create an `Attachment` record, then use that with the `attach` method on the writer. This means that it is not possible to attach files when the file exceeds the available memory on the computer. This change adds The methods `start_attachment`, `put_attachment_bytes` and `finish_attachment` for interactively writing an attachment without needing to buffer the file in memory. It looks like this: ```rust let writer = Writer::new(..); let attachment_length = ..; let header = AttachmentHeader { .. }; writer.start_attachment(attachment_length, header)?; writer.put_attachment_bytes(..)?; writer.put_attachment_bytes(..)?; writer.put_attachment_bytes(..)?; writer.finish_attachment()?; ``` I've updated the current `attach` method to use this approach and the roundtrip tests pass. I've also added some tests for using this API directly.
BeforeAfter
attachments must be loaded into memory to be written attachments can be written in parts
--------- Co-authored-by: james-rms --- rust/Cargo.toml | 2 +- rust/src/lib.rs | 8 ++ rust/src/write.rs | 294 ++++++++++++++++++++++++++++++++------- rust/tests/attachment.rs | 64 +++++++++ 4 files changed, 313 insertions(+), 55 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 1e9840ed2..753748146 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ] repository = "https://github.com/foxglove/mcap" documentation = "https://docs.rs/mcap" readme = "README.md" -version = "0.12.1" +version = "0.13.0" edition = "2021" license = "MIT" diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 859f08398..5a6207865 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -89,6 +89,14 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum McapError { + #[error("tried to write to output while attachment is in progress")] + AttachmentInProgress, + #[error("tried to write bytes to an attachment but no attachment was in progress")] + AttachmentNotInProgress, + #[error("tried to write {excess} more bytes to attachment than the requested attachment length {attachment_length}")] + AttachmentTooLarge { excess: u64, attachment_length: u64 }, + #[error("tried to finish writing attachment but current length {current} was not expected length {expected}")] + AttachmentIncomplete { current: u64, expected: u64 }, #[error("Bad magic number")] BadMagic, #[error("Footer record couldn't be found at the end of the file, before the magic bytes")] diff --git a/rust/src/write.rs b/rust/src/write.rs index 298dee243..ae588ad28 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -13,7 +13,7 @@ use byteorder::{WriteBytesExt, LE}; use crate::{ chunk_sink::{ChunkMode, ChunkSink}, io_utils::CountingCrcWriter, - records::{self, op, MessageHeader, Record}, + records::{self, op, AttachmentHeader, AttachmentIndex, MessageHeader, Record}, Attachment, Channel, Compression, McapError, McapResult, Message, Schema, MAGIC, }; @@ -25,11 +25,12 @@ pub use records::Metadata; enum WriteMode { Raw(W, ChunkMode), Chunk(ChunkWriter), + Attachment(AttachmentWriter, ChunkMode), } -fn op_and_len(w: &mut W, op: u8, len: usize) -> io::Result<()> { +fn op_and_len(w: &mut W, op: u8, len: u64) -> io::Result<()> { w.write_u8(op)?; - w.write_u64::(len as u64)?; + w.write_u64::(len)?; Ok(()) } @@ -41,7 +42,7 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { let mut rec_buf = Vec::new(); Cursor::new(&mut rec_buf).write_le($b).unwrap(); - op_and_len(w, $op, rec_buf.len())?; + op_and_len(w, $op, rec_buf.len() as _)?; w.write_all(&rec_buf)?; }}; } @@ -58,7 +59,7 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { op_and_len( w, op::SCHEMA, - header_buf.len() + size_of::() + data.len(), + (header_buf.len() + size_of::() + data.len()) as _, )?; w.write_all(&header_buf)?; w.write_u32::(data.len() as u32)?; @@ -69,7 +70,7 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { let mut header_buf = Vec::new(); Cursor::new(&mut header_buf).write_le(header).unwrap(); - op_and_len(w, op::MESSAGE, header_buf.len() + data.len())?; + op_and_len(w, op::MESSAGE, (header_buf.len() + data.len()) as _)?; w.write_all(&header_buf)?; w.write_all(data)?; } @@ -80,21 +81,8 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { unreachable!("MessageIndexes handle their own serialization to recycle the buffer between indexes") } Record::ChunkIndex(c) => record!(op::CHUNK_INDEX, c), - Record::Attachment { header, data } => { - let mut header_buf = Vec::new(); - Cursor::new(&mut header_buf).write_le(header).unwrap(); - op_and_len( - w, - op::ATTACHMENT, - header_buf.len() + size_of::() + data.len() + size_of::(), /* crc */ - )?; - - let mut checksummer = CountingCrcWriter::new(w); - checksummer.write_all(&header_buf)?; - checksummer.write_u64::(data.len() as u64)?; - checksummer.write_all(data)?; - let (w, crc) = checksummer.finalize(); - w.write_u32::(crc)?; + Record::Attachment { .. } => { + unreachable!("Attachments handle their own serialization to handle large files") } Record::AttachmentIndex(ai) => record!(op::ATTACHMENT_INDEX, ai), Record::Statistics(s) => record!(op::STATISTICS, s), @@ -388,37 +376,124 @@ impl<'a, W: Write + Seek> Writer<'a, W> { Ok(()) } - pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> { - let header = records::AttachmentHeader { - log_time: attachment.log_time, - create_time: attachment.create_time, - name: attachment.name.clone(), - media_type: attachment.media_type.clone(), + /// Start writing an attachment. + /// + /// This is a low level API. For small attachments, use [`Self::attach`]. + /// + /// To start writing an attachment call this method with the [`AttachmentHeader`] as well as + /// the length of the attachment in bytes. It is important this length is exact otherwise the + /// writer will be left in an error state. + /// + /// This call should be followed by one or more calls to [`Self::put_attachment_bytes`]. + /// + /// Once all attachment bytes have been written the attachment must be completed with a call to + /// [`Self::finish_attachment`]. Failing to finish the attachment will leave the write in an + /// error state. + /// + /// # Example + /// ```rust + /// # use mcap::write::Writer; + /// # use mcap::records::AttachmentHeader; + /// # + /// # fn run() -> Result<(), Box> { + /// # let mut output = vec![]; + /// # let mut writer = Writer::new(std::io::Cursor::new(&mut output))?; + /// let attachment_length = 6; + /// + /// // Start the attachment + /// writer.start_attachment(attachment_length, AttachmentHeader { + /// log_time: 100, + /// create_time: 200, + /// name: "my-attachment".into(), + /// media_type: "application/octet-stream".into() + /// })?; + /// + /// // Write all the bytes for the attachment. The amount of bytes written must + /// // match the length specified when the attachment was started. + /// writer.put_attachment_bytes(&[ 1, 2, 3, 4 ])?; + /// writer.put_attachment_bytes(&[ 5, 6 ])?; + /// + /// // Finsh writing the attachment. + /// writer.finish_attachment()?; + /// # + /// # Ok(()) + /// # } + /// # run().expect("should succeed"); + /// ``` + pub fn start_attachment( + &mut self, + attachment_length: u64, + header: AttachmentHeader, + ) -> McapResult<()> { + self.finish_chunk()?; + + let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); + + let WriteMode::Raw(w, chunk_mode) = prev_writer else { + panic!( + "since finish_chunk was called, write mode is guaranteed to be raw at this point" + ); }; - // Attachments don't live in chunks. - let w = self.finish_chunk()?; + self.writer = Some(WriteMode::Attachment( + AttachmentWriter::new(w, attachment_length, header)?, + chunk_mode, + )); - let offset = w.stream_position()?; + Ok(()) + } - write_record( - w, - &Record::Attachment { - header, - data: Cow::Borrowed(&attachment.data), - }, - )?; + /// Write bytes to the current attachment. + /// + /// This is a low level API. For small attachments, use [`Self::attach`]. + /// + /// Before calling this method call [`Self::start_attachment`]. + pub fn put_attachment_bytes(&mut self, bytes: &[u8]) -> McapResult<()> { + let Some(WriteMode::Attachment(writer, _)) = &mut self.writer else { + return Err(McapError::AttachmentNotInProgress); + }; - let length = w.stream_position()? - offset; - self.attachment_indexes.push(records::AttachmentIndex { - offset, - length, + writer.put_bytes(bytes)?; + + Ok(()) + } + + /// Finish the current attachment. + /// + /// This is a low level API. For small attachments, use [`Self::attach`]. + /// + /// Before calling this method call [`Self::start_attachment`] and write bytes to the + /// attachment using [`Self::put_attachment_bytes`]. + pub fn finish_attachment(&mut self) -> McapResult<()> { + let Some(WriteMode::Attachment(..)) = &mut self.writer else { + return Err(McapError::AttachmentNotInProgress); + }; + + let Some(WriteMode::Attachment(writer, chunk_mode)) = self.writer.take() else { + panic!("WriteMode is guaranteed to be attachment by this point"); + }; + + let (writer, attachment_index) = writer.finish()?; + + self.attachment_indexes.push(attachment_index); + + self.writer = Some(WriteMode::Raw(writer, chunk_mode)); + + Ok(()) + } + + /// Write an attachment to the MCAP file + pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> { + let header = records::AttachmentHeader { log_time: attachment.log_time, create_time: attachment.create_time, - data_size: attachment.data.len() as u64, name: attachment.name.clone(), media_type: attachment.media_type.clone(), - }); + }; + + self.start_attachment(attachment.data.len() as _, header)?; + self.put_attachment_bytes(&attachment.data[..])?; + self.finish_attachment()?; Ok(()) } @@ -467,6 +542,12 @@ impl<'a, W: Write + Seek> Writer<'a, W> { /// Starts a new chunk if we haven't done so already. fn chunkin_time(&mut self) -> McapResult<&mut ChunkWriter> { + // It is not possible to start writing a chunk if we're still writing an attachment. Return + // an error instead. + if let Some(WriteMode::Attachment(..)) = self.writer { + return Err(McapError::AttachmentNotInProgress); + } + // Some Rust tricky: we can't move the writer out of self.writer, // leave that empty for a bit, and then replace it with a ChunkWriter. // (That would leave it in an unspecified state if we bailed here!) @@ -487,14 +568,21 @@ impl<'a, W: Write + Seek> Writer<'a, W> { chunk => chunk, }); - match &mut self.writer { - Some(WriteMode::Chunk(c)) => Ok(c), - _ => unreachable!(), - } + let Some(WriteMode::Chunk(c)) = &mut self.writer else { + unreachable!("we're not in an attachment and write mode was set to chunk above") + }; + + Ok(c) } /// Finish the current chunk, if we have one. fn finish_chunk(&mut self) -> McapResult<&mut W> { + // If we're currently writing an attachment then we're not writing a chunk. Return an + // error instead. + if let Some(WriteMode::Attachment(..)) = self.writer { + return Err(McapError::AttachmentNotInProgress); + } + // See above let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); @@ -504,13 +592,14 @@ impl<'a, W: Write + Seek> Writer<'a, W> { self.chunk_indexes.push(index); WriteMode::Raw(w, mode) } - raw => raw, + mode => mode, }); - match &mut self.writer { - Some(WriteMode::Raw(w, _)) => Ok(w), - _ => unreachable!(), - } + let Some(WriteMode::Raw(w, _)) = &mut self.writer else { + unreachable!("we're not in an attachment and write mode raw was set above") + }; + + Ok(w) } /// Finishes any current chunk and writes out the rest of the file. @@ -932,7 +1021,7 @@ impl ChunkWriter { let mut sink = stream.finish()?; let data_end = sink.stream_position()?; let compressed_size = data_end - self.data_start; - let record_size = (data_end - self.header_start) as usize - 9; // 1 byte op, 8 byte len + let record_size = (data_end - self.header_start) - 9; // 1 byte op, 8 byte len // Back up, write our finished header, then continue at the end of the stream. sink.seek(SeekFrom::Start(self.header_start))?; @@ -976,7 +1065,7 @@ impl ChunkWriter { }; Cursor::new(&mut index_buf).write_le(&index)?; - op_and_len(&mut sink, op::MESSAGE_INDEX, index_buf.len())?; + op_and_len(&mut sink, op::MESSAGE_INDEX, index_buf.len() as _)?; sink.write_all(&index_buf)?; } let end_of_indexes = sink.stream_position()?; @@ -1005,3 +1094,100 @@ impl ChunkWriter { Ok((writer, mode, index)) } } + +struct AttachmentWriter { + record_offset: u64, + attachment_offset: u64, + attachment_length: u64, + header: AttachmentHeader, + writer: CountingCrcWriter, +} + +impl AttachmentWriter { + /// Create a new [`AttachmentWriter`] and write the attachment header to the output. + fn new(mut writer: W, attachment_length: u64, header: AttachmentHeader) -> McapResult { + let record_offset = writer.stream_position()?; + + // We have to write to a temporary buffer here as the CountingCrcWriter doesn't support + // seeking. + let mut header_buf = vec![]; + Cursor::new(&mut header_buf).write_le(&header)?; + + op_and_len( + &mut writer, + op::ATTACHMENT, + header_buf.len() as u64 + // attachment_length + + size_of::() as u64 + // attachment + + attachment_length + // crc + + size_of::() as u64, + )?; + + let mut writer = CountingCrcWriter::new(writer); + writer.write_all(&header_buf)?; + writer.write_u64::(attachment_length)?; + + let attachment_offset = writer.position(); + + Ok(Self { + record_offset, + attachment_offset, + attachment_length, + header, + writer, + }) + } + + /// Write bytes to the attachment. + /// + /// This method will return an error if the provided bytes exceed the space remaining in the + /// attachment. + fn put_bytes(&mut self, bytes: &[u8]) -> McapResult<()> { + let attachment_position = self.writer.position() - self.attachment_offset; + + let space = self.attachment_length - attachment_position; + let byte_length = bytes.len() as u64; + + if byte_length > space { + return Err(McapError::AttachmentTooLarge { + excess: byte_length - space, + attachment_length: self.attachment_length, + }); + } + + self.writer.write_all(bytes)?; + Ok(()) + } + + /// Finish the attachment and write the CRC to the output, returning the [`AttachmentIndex`] + /// for the written attachment. + fn finish(self) -> McapResult<(W, AttachmentIndex)> { + let expected = self.attachment_length; + let current = self.writer.position() - self.attachment_offset; + + if expected != current { + return Err(McapError::AttachmentIncomplete { expected, current }); + } + + let (mut writer, crc) = self.writer.finalize(); + writer.write_u32::(crc)?; + + let offset = self.record_offset; + let length = writer.stream_position()? - offset; + + Ok(( + writer, + AttachmentIndex { + offset, + length, + log_time: self.header.log_time, + media_type: self.header.media_type, + name: self.header.name, + create_time: self.header.create_time, + data_size: self.attachment_length, + }, + )) + } +} diff --git a/rust/tests/attachment.rs b/rust/tests/attachment.rs index 57eb98b1e..d57e405b5 100644 --- a/rust/tests/attachment.rs +++ b/rust/tests/attachment.rs @@ -1,6 +1,7 @@ mod common; use common::*; +use mcap::records::AttachmentHeader; use std::{borrow::Cow, io::BufWriter}; @@ -33,6 +34,69 @@ fn smoke() -> Result<()> { Ok(()) } +#[test] +fn test_attach_in_multiple_parts() -> Result<()> { + let mut tmp = tempfile()?; + let mut writer = mcap::Writer::new(BufWriter::new(&mut tmp))?; + + let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let (left, right) = data.split_at(5); + + writer.start_attachment( + 10, + AttachmentHeader { + log_time: 100, + create_time: 200, + name: "great-attachment".into(), + media_type: "application/octet-stream".into(), + }, + )?; + + writer.put_attachment_bytes(left)?; + writer.put_attachment_bytes(right)?; + + writer.finish_attachment()?; + + drop(writer); + + let ours = unsafe { Mmap::map(&tmp) }?; + let summary = mcap::Summary::read(&ours)?; + + let expected_summary = Some(mcap::Summary { + stats: Some(mcap::records::Statistics { + attachment_count: 1, + ..Default::default() + }), + attachment_indexes: vec![mcap::records::AttachmentIndex { + // offset depends on the length of the embedded library string, which includes the crate version + offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64), + length: 95, + log_time: 100, + create_time: 200, + data_size: 10, + name: "great-attachment".into(), + media_type: "application/octet-stream".into(), + }], + ..Default::default() + }); + assert_eq!(summary, expected_summary); + + let expected_attachment = mcap::Attachment { + log_time: 100, + create_time: 200, + name: "great-attachment".into(), + media_type: "application/octet-stream".into(), + data: Cow::Borrowed(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), + }; + + assert_eq!( + mcap::read::attachment(&ours, &summary.unwrap().attachment_indexes[0])?, + expected_attachment + ); + + Ok(()) +} + #[test] fn round_trip() -> Result<()> { let mapped = map_mcap("../tests/conformance/data/OneAttachment/OneAttachment.mcap")?;