diff --git a/rust/.gdb_history b/rust/.gdb_history new file mode 100644 index 000000000..0eafdaee8 --- /dev/null +++ b/rust/.gdb_history @@ -0,0 +1,35 @@ +main +run test_decompression_does_not_fail +pause +break_all +pause all +continue +break +break +main +pause +run +break +break +break +continue +reak +break +break +pause +stop +stop +stop +kill +ruyn +run +exit +run +break +run +run +break +pause +stop +stop +run diff --git a/rust/examples/loop.rs b/rust/examples/loop.rs new file mode 100644 index 000000000..264ebeb33 --- /dev/null +++ b/rust/examples/loop.rs @@ -0,0 +1,23 @@ +use std::io::Read; + +use mcap::sans_io::read::{LinearReader, ReadAction}; + +fn main() { + let mut f = std::fs::File::open("tests/data/break_zstd_decompression.mcap") + .expect("failed to open file"); + let blocksize: usize = 1024; + let mut reader = LinearReader::new(); + while let Some(action) = reader.next_action() { + match action.expect("failed to get next action") { + ReadAction::GetRecord { data: _, opcode } => { + print!("{},", opcode); + } + ReadAction::NeedMore(_) => { + let read = f + .read(reader.insert(blocksize)) + .expect("failed to read from file"); + reader.set_written(read); + } + } + } +} diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index dd2ae43bd..56509fed1 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -5,6 +5,7 @@ use std::collections::HashMap; use super::decompressor::Decompressor; use crate::{ records::{op, ChunkHeader}, + sans_io::decompressor::DecompressResult, McapError, McapResult, MAGIC, }; use binrw::BinReaderExt; @@ -362,11 +363,12 @@ impl LinearReader { &mut self.decompressed_content, $remaining, ) { - Ok(None) => { + Ok(DecompressStatus::Filled) => { &self.decompressed_content.data [self.decompressed_content.start..self.decompressed_content.start + $n] } - Ok(Some(n)) => return Some(Ok(ReadAction::NeedMore(n))), + Ok(DecompressStatus::Eof) => continue, + Ok(DecompressStatus::Need(n)) => return Some(Ok(ReadAction::NeedMore(n))), Err(err) => return Some(Err(err)), } }}; @@ -698,6 +700,12 @@ fn get_decompressor( } } +enum DecompressStatus { + Eof, + Need(usize), + Filled, +} + // decompresses up to `n` bytes from `from` into `to`. Repeatedly calls `decompress` until // either the input is exhausted or enough data has been written. Returns None if all required // data has been decompressed, or Some(need) if more bytes need to be read from the input. @@ -707,27 +715,40 @@ fn decompress_inner( src_buf: &mut RwBuf, dest_buf: &mut RwBuf, compressed_remaining: &mut u64, -) -> McapResult> { +) -> McapResult { if dest_buf.len() >= n { - return Ok(None); + return Ok(DecompressStatus::Filled); } + dest_buf.data.resize(dest_buf.start + n, 0); + loop { let need = decompressor.next_read_size(); let have = src_buf.len(); + if need > have { - return Ok(Some(need - have)); + return Ok(DecompressStatus::Need(need - have)); } + let dst = &mut dest_buf.data[dest_buf.end..]; + if dst.is_empty() { - return Ok(None); + return Ok(DecompressStatus::Filled); } + let src_len = std::cmp::min(have, clamp_to_usize(*compressed_remaining)); + let src = &src_buf.data[src_buf.start..src_buf.start + src_len]; + let res = decompressor.decompress(src, dst)?; src_buf.mark_read(res.consumed); dest_buf.end += res.wrote; + *compressed_remaining -= res.consumed as u64; + + if *compressed_remaining == 0 { + return Ok(DecompressStatus::Eof); + } } }