From 742c3c74232bd9f83a1b54aa03b9a80a348f73ed Mon Sep 17 00:00:00 2001 From: Bennett Hardwick Date: Thu, 5 Dec 2024 14:46:29 +1100 Subject: [PATCH] rust: write attachments in multiple parts --- rust/src/lib.rs | 8 ++ rust/src/write.rs | 263 +++++++++++++++++++++++++++++++-------- rust/tests/attachment.rs | 64 ++++++++++ 3 files changed, 281 insertions(+), 54 deletions(-) diff --git a/rust/src/lib.rs b/rust/src/lib.rs index b094e352d..96797ad09 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -89,6 +89,14 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum McapError { + #[error("tried to write to output while attachment is in progress")] + AttachmentInProgress, + #[error("tried to write bytes to an attachment but no attachment was in progress")] + AttachmentNotInProgress, + #[error("tried to write {excess} more bytes to attachment than the requested attachment length {attachment_length}")] + AttachmentTooLarge { excess: u64, attachment_length: u64 }, + #[error("tried to finish writing attachment but current length {current} was not expected length {expected}")] + AttachmentIncomplete { current: u64, expected: u64 }, #[error("Bad magic number")] BadMagic, #[error("Footer record couldn't be found at the end of the file, before the magic bytes")] diff --git a/rust/src/write.rs b/rust/src/write.rs index 298dee243..fe2fd23a6 100644 --- a/rust/src/write.rs +++ b/rust/src/write.rs @@ -13,7 +13,7 @@ use byteorder::{WriteBytesExt, LE}; use crate::{ chunk_sink::{ChunkMode, ChunkSink}, io_utils::CountingCrcWriter, - records::{self, op, MessageHeader, Record}, + records::{self, op, AttachmentHeader, AttachmentIndex, MessageHeader, Record}, Attachment, Channel, Compression, McapError, McapResult, Message, Schema, MAGIC, }; @@ -25,11 +25,12 @@ pub use records::Metadata; enum WriteMode { Raw(W, ChunkMode), Chunk(ChunkWriter), + Attachment(AttachmentWriter, ChunkMode), } -fn op_and_len(w: &mut W, op: u8, len: usize) -> io::Result<()> { +fn op_and_len(w: &mut W, op: u8, len: u64) -> io::Result<()> { w.write_u8(op)?; - w.write_u64::(len as u64)?; + w.write_u64::(len)?; Ok(()) } @@ -41,7 +42,7 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { let mut rec_buf = Vec::new(); Cursor::new(&mut rec_buf).write_le($b).unwrap(); - op_and_len(w, $op, rec_buf.len())?; + op_and_len(w, $op, rec_buf.len() as _)?; w.write_all(&rec_buf)?; }}; } @@ -58,7 +59,7 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { op_and_len( w, op::SCHEMA, - header_buf.len() + size_of::() + data.len(), + (header_buf.len() + size_of::() + data.len()) as _, )?; w.write_all(&header_buf)?; w.write_u32::(data.len() as u32)?; @@ -69,7 +70,7 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { let mut header_buf = Vec::new(); Cursor::new(&mut header_buf).write_le(header).unwrap(); - op_and_len(w, op::MESSAGE, header_buf.len() + data.len())?; + op_and_len(w, op::MESSAGE, (header_buf.len() + data.len()) as _)?; w.write_all(&header_buf)?; w.write_all(data)?; } @@ -80,21 +81,8 @@ fn write_record(w: &mut W, r: &Record) -> io::Result<()> { unreachable!("MessageIndexes handle their own serialization to recycle the buffer between indexes") } Record::ChunkIndex(c) => record!(op::CHUNK_INDEX, c), - Record::Attachment { header, data } => { - let mut header_buf = Vec::new(); - Cursor::new(&mut header_buf).write_le(header).unwrap(); - op_and_len( - w, - op::ATTACHMENT, - header_buf.len() + size_of::() + data.len() + size_of::(), /* crc */ - )?; - - let mut checksummer = CountingCrcWriter::new(w); - checksummer.write_all(&header_buf)?; - checksummer.write_u64::(data.len() as u64)?; - checksummer.write_all(data)?; - let (w, crc) = checksummer.finalize(); - w.write_u32::(crc)?; + Record::Attachment { .. } => { + unreachable!("Attachments handle their own serialization to handle large files") } Record::AttachmentIndex(ai) => record!(op::ATTACHMENT_INDEX, ai), Record::Statistics(s) => record!(op::STATISTICS, s), @@ -388,37 +376,93 @@ impl<'a, W: Write + Seek> Writer<'a, W> { Ok(()) } - pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> { - let header = records::AttachmentHeader { - log_time: attachment.log_time, - create_time: attachment.create_time, - name: attachment.name.clone(), - media_type: attachment.media_type.clone(), + /// Start writing an attachment. + /// + /// This is a low level API. It is recommended to use [`Self::attach`] instead. + /// + /// To start writing an attachment call this method with the [`AttachmentHeader`] as well as + /// the length of the attachment in bytes. It is important this length is exact otherwise the + /// writer will be left in an error state. + /// + /// This call should be followed by one or more calls to [`Self::put_attachment_bytes`]. + /// + /// Once all attachment bytes have been written the attachment must be completed with a call to + /// [`Self::finish_attachment`]. Failing to finish the attachment will leave the write in an + /// error state. + pub fn start_attachment( + &mut self, + attachment_length: u64, + header: AttachmentHeader, + ) -> McapResult<()> { + self.finish_chunk()?; + + let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); + + let WriteMode::Raw(w, chunk_mode) = prev_writer else { + panic!( + "since finish_chunk was called, write mode is guaranteed to be raw at this point" + ); }; - // Attachments don't live in chunks. - let w = self.finish_chunk()?; + self.writer = Some(WriteMode::Attachment( + AttachmentWriter::new(w, attachment_length, header)?, + chunk_mode, + )); - let offset = w.stream_position()?; + Ok(()) + } - write_record( - w, - &Record::Attachment { - header, - data: Cow::Borrowed(&attachment.data), - }, - )?; + /// Write bytes to the current attachment. + /// + /// This is a low level API. It is recommended to use [`Self::attach`] instead. + /// + /// Before calling this method call [`Self::start_attachment`]. + pub fn put_attachment_bytes(&mut self, bytes: &[u8]) -> McapResult<()> { + let Some(WriteMode::Attachment(writer, _)) = &mut self.writer else { + return Err(McapError::AttachmentNotInProgress); + }; - let length = w.stream_position()? - offset; - self.attachment_indexes.push(records::AttachmentIndex { - offset, - length, + writer.put_bytes(bytes)?; + + Ok(()) + } + + /// Finish the current attachment. + /// + /// This is a low level API. It is recommended to use [`Self::attach`] instead. + /// + /// Before calling this method call [`Self::start_attachment`] and write bytes to the + /// attachment using [`Self::put_attachment_bytes`]. + pub fn finish_attachment(&mut self) -> McapResult<()> { + let Some(WriteMode::Attachment(..)) = &mut self.writer else { + return Err(McapError::AttachmentNotInProgress); + }; + + let Some(WriteMode::Attachment(writer, chunk_mode)) = self.writer.take() else { + panic!("WriteMode is guaranteed to be attachment by this point"); + }; + + let (writer, attachment_index) = writer.finish()?; + + self.attachment_indexes.push(attachment_index); + + self.writer = Some(WriteMode::Raw(writer, chunk_mode)); + + Ok(()) + } + + /// Write an attachment to the MCAP file + pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> { + let header = records::AttachmentHeader { log_time: attachment.log_time, create_time: attachment.create_time, - data_size: attachment.data.len() as u64, name: attachment.name.clone(), media_type: attachment.media_type.clone(), - }); + }; + + self.start_attachment(attachment.data.len() as _, header)?; + self.put_attachment_bytes(&attachment.data[..])?; + self.finish_attachment()?; Ok(()) } @@ -467,6 +511,12 @@ impl<'a, W: Write + Seek> Writer<'a, W> { /// Starts a new chunk if we haven't done so already. fn chunkin_time(&mut self) -> McapResult<&mut ChunkWriter> { + // It is not possible to start writing a chunk if we're still writing an attachment. Return + // an error instead. + if let Some(WriteMode::Attachment(..)) = self.writer { + return Err(McapError::AttachmentNotInProgress); + } + // Some Rust tricky: we can't move the writer out of self.writer, // leave that empty for a bit, and then replace it with a ChunkWriter. // (That would leave it in an unspecified state if we bailed here!) @@ -487,14 +537,21 @@ impl<'a, W: Write + Seek> Writer<'a, W> { chunk => chunk, }); - match &mut self.writer { - Some(WriteMode::Chunk(c)) => Ok(c), - _ => unreachable!(), - } + let Some(WriteMode::Chunk(c)) = &mut self.writer else { + unreachable!("we're not in an attachment and write mode was set to chunk above") + }; + + Ok(c) } /// Finish the current chunk, if we have one. fn finish_chunk(&mut self) -> McapResult<&mut W> { + // If we're currently writing an attachment then we're not writing a chunk. Return an + // error instead. + if let Some(WriteMode::Attachment(..)) = self.writer { + return Err(McapError::AttachmentNotInProgress); + } + // See above let prev_writer = self.writer.take().expect(Self::WHERE_WRITER); @@ -504,13 +561,14 @@ impl<'a, W: Write + Seek> Writer<'a, W> { self.chunk_indexes.push(index); WriteMode::Raw(w, mode) } - raw => raw, + mode => mode, }); - match &mut self.writer { - Some(WriteMode::Raw(w, _)) => Ok(w), - _ => unreachable!(), - } + let Some(WriteMode::Raw(w, _)) = &mut self.writer else { + unreachable!("we're not in an attachment and write mode raw was set above") + }; + + Ok(w) } /// Finishes any current chunk and writes out the rest of the file. @@ -932,7 +990,7 @@ impl ChunkWriter { let mut sink = stream.finish()?; let data_end = sink.stream_position()?; let compressed_size = data_end - self.data_start; - let record_size = (data_end - self.header_start) as usize - 9; // 1 byte op, 8 byte len + let record_size = (data_end - self.header_start) - 9; // 1 byte op, 8 byte len // Back up, write our finished header, then continue at the end of the stream. sink.seek(SeekFrom::Start(self.header_start))?; @@ -976,7 +1034,7 @@ impl ChunkWriter { }; Cursor::new(&mut index_buf).write_le(&index)?; - op_and_len(&mut sink, op::MESSAGE_INDEX, index_buf.len())?; + op_and_len(&mut sink, op::MESSAGE_INDEX, index_buf.len() as _)?; sink.write_all(&index_buf)?; } let end_of_indexes = sink.stream_position()?; @@ -1005,3 +1063,100 @@ impl ChunkWriter { Ok((writer, mode, index)) } } + +struct AttachmentWriter { + record_offset: u64, + attachment_offset: u64, + attachment_length: u64, + header: AttachmentHeader, + writer: CountingCrcWriter, +} + +impl AttachmentWriter { + /// Create a new [`AttachmentWriter`] and write the attachment header to the output. + fn new(mut writer: W, attachment_length: u64, header: AttachmentHeader) -> McapResult { + let record_offset = writer.stream_position()?; + + // We have to write to a temporary buffer here as the CountingCrcWriter doesn't support + // seeking. + let mut header_buf = vec![]; + Cursor::new(&mut header_buf).write_le(&header)?; + + op_and_len( + &mut writer, + op::ATTACHMENT, + header_buf.len() as u64 + // attachment_length + + size_of::() as u64 + // attachment + + attachment_length + // crc + + size_of::() as u64, + )?; + + let mut writer = CountingCrcWriter::new(writer); + writer.write_all(&header_buf)?; + writer.write_u64::(attachment_length)?; + + let attachment_offset = writer.position(); + + Ok(Self { + record_offset, + attachment_offset, + attachment_length, + header, + writer, + }) + } + + /// Write bytes to the attachment. + /// + /// This method will return an error if the provided bytes exceed the space remaining in the + /// attachment. + fn put_bytes(&mut self, bytes: &[u8]) -> McapResult<()> { + let attachment_position = self.writer.position() - self.attachment_offset; + + let space = self.attachment_length - attachment_position; + let byte_length = bytes.len() as u64; + + if byte_length > space { + return Err(McapError::AttachmentTooLarge { + excess: byte_length - space, + attachment_length: self.attachment_length, + }); + } + + self.writer.write_all(bytes)?; + Ok(()) + } + + /// Finish the attachment and write the CRC to the output, returning the [`AttachmentIndex`] + /// for the written attachment. + fn finish(self) -> McapResult<(W, AttachmentIndex)> { + let expected = self.attachment_length; + let current = self.writer.position() - self.attachment_offset; + + if expected != current { + return Err(McapError::AttachmentIncomplete { expected, current }); + } + + let (mut writer, crc) = self.writer.finalize(); + writer.write_u32::(crc)?; + + let offset = self.record_offset; + let length = writer.stream_position()? - offset; + + Ok(( + writer, + AttachmentIndex { + offset, + length, + log_time: self.header.log_time, + media_type: self.header.media_type, + name: self.header.name, + create_time: self.header.create_time, + data_size: self.attachment_length, + }, + )) + } +} diff --git a/rust/tests/attachment.rs b/rust/tests/attachment.rs index 57eb98b1e..d57e405b5 100644 --- a/rust/tests/attachment.rs +++ b/rust/tests/attachment.rs @@ -1,6 +1,7 @@ mod common; use common::*; +use mcap::records::AttachmentHeader; use std::{borrow::Cow, io::BufWriter}; @@ -33,6 +34,69 @@ fn smoke() -> Result<()> { Ok(()) } +#[test] +fn test_attach_in_multiple_parts() -> Result<()> { + let mut tmp = tempfile()?; + let mut writer = mcap::Writer::new(BufWriter::new(&mut tmp))?; + + let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + let (left, right) = data.split_at(5); + + writer.start_attachment( + 10, + AttachmentHeader { + log_time: 100, + create_time: 200, + name: "great-attachment".into(), + media_type: "application/octet-stream".into(), + }, + )?; + + writer.put_attachment_bytes(left)?; + writer.put_attachment_bytes(right)?; + + writer.finish_attachment()?; + + drop(writer); + + let ours = unsafe { Mmap::map(&tmp) }?; + let summary = mcap::Summary::read(&ours)?; + + let expected_summary = Some(mcap::Summary { + stats: Some(mcap::records::Statistics { + attachment_count: 1, + ..Default::default() + }), + attachment_indexes: vec![mcap::records::AttachmentIndex { + // offset depends on the length of the embedded library string, which includes the crate version + offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64), + length: 95, + log_time: 100, + create_time: 200, + data_size: 10, + name: "great-attachment".into(), + media_type: "application/octet-stream".into(), + }], + ..Default::default() + }); + assert_eq!(summary, expected_summary); + + let expected_attachment = mcap::Attachment { + log_time: 100, + create_time: 200, + name: "great-attachment".into(), + media_type: "application/octet-stream".into(), + data: Cow::Borrowed(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]), + }; + + assert_eq!( + mcap::read::attachment(&ours, &summary.unwrap().attachment_indexes[0])?, + expected_attachment + ); + + Ok(()) +} + #[test] fn round_trip() -> Result<()> { let mapped = map_mcap("../tests/conformance/data/OneAttachment/OneAttachment.mcap")?;