Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
rust: write attachments in multiple parts
Browse files Browse the repository at this point in the history
bennetthardwick committed Dec 5, 2024
1 parent 6631fbc commit 742c3c7
Showing 3 changed files with 281 additions and 54 deletions.
8 changes: 8 additions & 0 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
@@ -89,6 +89,14 @@ use thiserror::Error;

#[derive(Debug, Error)]
pub enum McapError {
#[error("tried to write to output while attachment is in progress")]
AttachmentInProgress,
#[error("tried to write bytes to an attachment but no attachment was in progress")]
AttachmentNotInProgress,
#[error("tried to write {excess} more bytes to attachment than the requested attachment length {attachment_length}")]
AttachmentTooLarge { excess: u64, attachment_length: u64 },
#[error("tried to finish writing attachment but current length {current} was not expected length {expected}")]
AttachmentIncomplete { current: u64, expected: u64 },
#[error("Bad magic number")]
BadMagic,
#[error("Footer record couldn't be found at the end of the file, before the magic bytes")]
263 changes: 209 additions & 54 deletions rust/src/write.rs
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@ use byteorder::{WriteBytesExt, LE};
use crate::{
chunk_sink::{ChunkMode, ChunkSink},
io_utils::CountingCrcWriter,
records::{self, op, MessageHeader, Record},
records::{self, op, AttachmentHeader, AttachmentIndex, MessageHeader, Record},
Attachment, Channel, Compression, McapError, McapResult, Message, Schema, MAGIC,
};

@@ -25,11 +25,12 @@ pub use records::Metadata;
enum WriteMode<W: Write + Seek> {
Raw(W, ChunkMode),
Chunk(ChunkWriter<W>),
Attachment(AttachmentWriter<W>, ChunkMode),
}

fn op_and_len<W: Write>(w: &mut W, op: u8, len: usize) -> io::Result<()> {
fn op_and_len<W: Write>(w: &mut W, op: u8, len: u64) -> io::Result<()> {
w.write_u8(op)?;
w.write_u64::<LE>(len as u64)?;
w.write_u64::<LE>(len)?;
Ok(())
}

@@ -41,7 +42,7 @@ fn write_record<W: Write>(w: &mut W, r: &Record) -> io::Result<()> {
let mut rec_buf = Vec::new();
Cursor::new(&mut rec_buf).write_le($b).unwrap();

op_and_len(w, $op, rec_buf.len())?;
op_and_len(w, $op, rec_buf.len() as _)?;
w.write_all(&rec_buf)?;
}};
}
@@ -58,7 +59,7 @@ fn write_record<W: Write>(w: &mut W, r: &Record) -> io::Result<()> {
op_and_len(
w,
op::SCHEMA,
header_buf.len() + size_of::<u32>() + data.len(),
(header_buf.len() + size_of::<u32>() + data.len()) as _,
)?;
w.write_all(&header_buf)?;
w.write_u32::<LE>(data.len() as u32)?;
@@ -69,7 +70,7 @@ fn write_record<W: Write>(w: &mut W, r: &Record) -> io::Result<()> {
let mut header_buf = Vec::new();
Cursor::new(&mut header_buf).write_le(header).unwrap();

op_and_len(w, op::MESSAGE, header_buf.len() + data.len())?;
op_and_len(w, op::MESSAGE, (header_buf.len() + data.len()) as _)?;
w.write_all(&header_buf)?;
w.write_all(data)?;
}
@@ -80,21 +81,8 @@ fn write_record<W: Write>(w: &mut W, r: &Record) -> io::Result<()> {
unreachable!("MessageIndexes handle their own serialization to recycle the buffer between indexes")
}
Record::ChunkIndex(c) => record!(op::CHUNK_INDEX, c),
Record::Attachment { header, data } => {
let mut header_buf = Vec::new();
Cursor::new(&mut header_buf).write_le(header).unwrap();
op_and_len(
w,
op::ATTACHMENT,
header_buf.len() + size_of::<u64>() + data.len() + size_of::<u32>(), /* crc */
)?;

let mut checksummer = CountingCrcWriter::new(w);
checksummer.write_all(&header_buf)?;
checksummer.write_u64::<LE>(data.len() as u64)?;
checksummer.write_all(data)?;
let (w, crc) = checksummer.finalize();
w.write_u32::<LE>(crc)?;
Record::Attachment { .. } => {
unreachable!("Attachments handle their own serialization to handle large files")
}
Record::AttachmentIndex(ai) => record!(op::ATTACHMENT_INDEX, ai),
Record::Statistics(s) => record!(op::STATISTICS, s),
@@ -388,37 +376,93 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
Ok(())
}

pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
let header = records::AttachmentHeader {
log_time: attachment.log_time,
create_time: attachment.create_time,
name: attachment.name.clone(),
media_type: attachment.media_type.clone(),
/// Start writing an attachment.
///
/// This is a low level API. It is recommended to use [`Self::attach`] instead.
///
/// To start writing an attachment call this method with the [`AttachmentHeader`] as well as
/// the length of the attachment in bytes. It is important this length is exact otherwise the
/// writer will be left in an error state.
///
/// This call should be followed by one or more calls to [`Self::put_attachment_bytes`].
///
/// Once all attachment bytes have been written the attachment must be completed with a call to
/// [`Self::finish_attachment`]. Failing to finish the attachment will leave the write in an
/// error state.
pub fn start_attachment(
&mut self,
attachment_length: u64,
header: AttachmentHeader,
) -> McapResult<()> {
self.finish_chunk()?;

let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);

let WriteMode::Raw(w, chunk_mode) = prev_writer else {
panic!(
"since finish_chunk was called, write mode is guaranteed to be raw at this point"
);
};

// Attachments don't live in chunks.
let w = self.finish_chunk()?;
self.writer = Some(WriteMode::Attachment(
AttachmentWriter::new(w, attachment_length, header)?,
chunk_mode,
));

let offset = w.stream_position()?;
Ok(())
}

write_record(
w,
&Record::Attachment {
header,
data: Cow::Borrowed(&attachment.data),
},
)?;
/// Write bytes to the current attachment.
///
/// This is a low level API. It is recommended to use [`Self::attach`] instead.
///
/// Before calling this method call [`Self::start_attachment`].
pub fn put_attachment_bytes(&mut self, bytes: &[u8]) -> McapResult<()> {
let Some(WriteMode::Attachment(writer, _)) = &mut self.writer else {
return Err(McapError::AttachmentNotInProgress);
};

let length = w.stream_position()? - offset;
self.attachment_indexes.push(records::AttachmentIndex {
offset,
length,
writer.put_bytes(bytes)?;

Ok(())
}

/// Finish the current attachment.
///
/// This is a low level API. It is recommended to use [`Self::attach`] instead.
///
/// Before calling this method call [`Self::start_attachment`] and write bytes to the
/// attachment using [`Self::put_attachment_bytes`].
pub fn finish_attachment(&mut self) -> McapResult<()> {
let Some(WriteMode::Attachment(..)) = &mut self.writer else {
return Err(McapError::AttachmentNotInProgress);
};

let Some(WriteMode::Attachment(writer, chunk_mode)) = self.writer.take() else {
panic!("WriteMode is guaranteed to be attachment by this point");
};

let (writer, attachment_index) = writer.finish()?;

self.attachment_indexes.push(attachment_index);

self.writer = Some(WriteMode::Raw(writer, chunk_mode));

Ok(())
}

/// Write an attachment to the MCAP file
pub fn attach(&mut self, attachment: &Attachment) -> McapResult<()> {
let header = records::AttachmentHeader {
log_time: attachment.log_time,
create_time: attachment.create_time,
data_size: attachment.data.len() as u64,
name: attachment.name.clone(),
media_type: attachment.media_type.clone(),
});
};

self.start_attachment(attachment.data.len() as _, header)?;
self.put_attachment_bytes(&attachment.data[..])?;
self.finish_attachment()?;

Ok(())
}
@@ -467,6 +511,12 @@ impl<'a, W: Write + Seek> Writer<'a, W> {

/// Starts a new chunk if we haven't done so already.
fn chunkin_time(&mut self) -> McapResult<&mut ChunkWriter<W>> {
// It is not possible to start writing a chunk if we're still writing an attachment. Return
// an error instead.
if let Some(WriteMode::Attachment(..)) = self.writer {
return Err(McapError::AttachmentNotInProgress);
}

// Some Rust tricky: we can't move the writer out of self.writer,
// leave that empty for a bit, and then replace it with a ChunkWriter.
// (That would leave it in an unspecified state if we bailed here!)
@@ -487,14 +537,21 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
chunk => chunk,
});

match &mut self.writer {
Some(WriteMode::Chunk(c)) => Ok(c),
_ => unreachable!(),
}
let Some(WriteMode::Chunk(c)) = &mut self.writer else {
unreachable!("we're not in an attachment and write mode was set to chunk above")
};

Ok(c)
}

/// Finish the current chunk, if we have one.
fn finish_chunk(&mut self) -> McapResult<&mut W> {
// If we're currently writing an attachment then we're not writing a chunk. Return an
// error instead.
if let Some(WriteMode::Attachment(..)) = self.writer {
return Err(McapError::AttachmentNotInProgress);
}

// See above
let prev_writer = self.writer.take().expect(Self::WHERE_WRITER);

@@ -504,13 +561,14 @@ impl<'a, W: Write + Seek> Writer<'a, W> {
self.chunk_indexes.push(index);
WriteMode::Raw(w, mode)
}
raw => raw,
mode => mode,
});

match &mut self.writer {
Some(WriteMode::Raw(w, _)) => Ok(w),
_ => unreachable!(),
}
let Some(WriteMode::Raw(w, _)) = &mut self.writer else {
unreachable!("we're not in an attachment and write mode raw was set above")
};

Ok(w)
}

/// Finishes any current chunk and writes out the rest of the file.
@@ -932,7 +990,7 @@ impl<W: Write + Seek> ChunkWriter<W> {
let mut sink = stream.finish()?;
let data_end = sink.stream_position()?;
let compressed_size = data_end - self.data_start;
let record_size = (data_end - self.header_start) as usize - 9; // 1 byte op, 8 byte len
let record_size = (data_end - self.header_start) - 9; // 1 byte op, 8 byte len

// Back up, write our finished header, then continue at the end of the stream.
sink.seek(SeekFrom::Start(self.header_start))?;
@@ -976,7 +1034,7 @@ impl<W: Write + Seek> ChunkWriter<W> {
};

Cursor::new(&mut index_buf).write_le(&index)?;
op_and_len(&mut sink, op::MESSAGE_INDEX, index_buf.len())?;
op_and_len(&mut sink, op::MESSAGE_INDEX, index_buf.len() as _)?;
sink.write_all(&index_buf)?;
}
let end_of_indexes = sink.stream_position()?;
@@ -1005,3 +1063,100 @@ impl<W: Write + Seek> ChunkWriter<W> {
Ok((writer, mode, index))
}
}

struct AttachmentWriter<W> {
record_offset: u64,
attachment_offset: u64,
attachment_length: u64,
header: AttachmentHeader,
writer: CountingCrcWriter<W>,
}

impl<W: Write + Seek> AttachmentWriter<W> {
/// Create a new [`AttachmentWriter`] and write the attachment header to the output.
fn new(mut writer: W, attachment_length: u64, header: AttachmentHeader) -> McapResult<Self> {
let record_offset = writer.stream_position()?;

// We have to write to a temporary buffer here as the CountingCrcWriter doesn't support
// seeking.
let mut header_buf = vec![];
Cursor::new(&mut header_buf).write_le(&header)?;

op_and_len(
&mut writer,
op::ATTACHMENT,
header_buf.len() as u64
// attachment_length
+ size_of::<u64>() as u64
// attachment
+ attachment_length
// crc
+ size_of::<u32>() as u64,
)?;

let mut writer = CountingCrcWriter::new(writer);
writer.write_all(&header_buf)?;
writer.write_u64::<LE>(attachment_length)?;

let attachment_offset = writer.position();

Ok(Self {
record_offset,
attachment_offset,
attachment_length,
header,
writer,
})
}

/// Write bytes to the attachment.
///
/// This method will return an error if the provided bytes exceed the space remaining in the
/// attachment.
fn put_bytes(&mut self, bytes: &[u8]) -> McapResult<()> {
let attachment_position = self.writer.position() - self.attachment_offset;

let space = self.attachment_length - attachment_position;
let byte_length = bytes.len() as u64;

if byte_length > space {
return Err(McapError::AttachmentTooLarge {
excess: byte_length - space,
attachment_length: self.attachment_length,
});
}

self.writer.write_all(bytes)?;
Ok(())
}

/// Finish the attachment and write the CRC to the output, returning the [`AttachmentIndex`]
/// for the written attachment.
fn finish(self) -> McapResult<(W, AttachmentIndex)> {
let expected = self.attachment_length;
let current = self.writer.position() - self.attachment_offset;

if expected != current {
return Err(McapError::AttachmentIncomplete { expected, current });
}

let (mut writer, crc) = self.writer.finalize();
writer.write_u32::<LE>(crc)?;

let offset = self.record_offset;
let length = writer.stream_position()? - offset;

Ok((
writer,
AttachmentIndex {
offset,
length,
log_time: self.header.log_time,
media_type: self.header.media_type,
name: self.header.name,
create_time: self.header.create_time,
data_size: self.attachment_length,
},
))
}
}
64 changes: 64 additions & 0 deletions rust/tests/attachment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod common;

use common::*;
use mcap::records::AttachmentHeader;

use std::{borrow::Cow, io::BufWriter};

@@ -33,6 +34,69 @@ fn smoke() -> Result<()> {
Ok(())
}

#[test]
fn test_attach_in_multiple_parts() -> Result<()> {
let mut tmp = tempfile()?;
let mut writer = mcap::Writer::new(BufWriter::new(&mut tmp))?;

let data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let (left, right) = data.split_at(5);

writer.start_attachment(
10,
AttachmentHeader {
log_time: 100,
create_time: 200,
name: "great-attachment".into(),
media_type: "application/octet-stream".into(),
},
)?;

writer.put_attachment_bytes(left)?;
writer.put_attachment_bytes(right)?;

writer.finish_attachment()?;

drop(writer);

let ours = unsafe { Mmap::map(&tmp) }?;
let summary = mcap::Summary::read(&ours)?;

let expected_summary = Some(mcap::Summary {
stats: Some(mcap::records::Statistics {
attachment_count: 1,
..Default::default()
}),
attachment_indexes: vec![mcap::records::AttachmentIndex {
// offset depends on the length of the embedded library string, which includes the crate version
offset: 33 + (env!("CARGO_PKG_VERSION").len() as u64),
length: 95,
log_time: 100,
create_time: 200,
data_size: 10,
name: "great-attachment".into(),
media_type: "application/octet-stream".into(),
}],
..Default::default()
});
assert_eq!(summary, expected_summary);

let expected_attachment = mcap::Attachment {
log_time: 100,
create_time: 200,
name: "great-attachment".into(),
media_type: "application/octet-stream".into(),
data: Cow::Borrowed(&[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
};

assert_eq!(
mcap::read::attachment(&ours, &summary.unwrap().attachment_indexes[0])?,
expected_attachment
);

Ok(())
}

#[test]
fn round_trip() -> Result<()> {
let mapped = map_mcap("../tests/conformance/data/OneAttachment/OneAttachment.mcap")?;

0 comments on commit 742c3c7

Please sign in to comment.