Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bennetthardwick committed Dec 19, 2024
1 parent 3503d2e commit 37fb262
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
35 changes: 35 additions & 0 deletions rust/.gdb_history
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
main
run test_decompression_does_not_fail
pause
break_all
pause all
continue
break
break
main
pause
run
break
break
break
continue
reak
break
break
pause
stop
stop
stop
kill
ruyn
run
exit
run
break
run
run
break
pause
stop
stop
run
23 changes: 23 additions & 0 deletions rust/examples/loop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use std::io::Read;

use mcap::sans_io::read::{LinearReader, ReadAction};

fn main() {
let mut f = std::fs::File::open("tests/data/break_zstd_decompression.mcap")
.expect("failed to open file");
let blocksize: usize = 1024;
let mut reader = LinearReader::new();
while let Some(action) = reader.next_action() {
match action.expect("failed to get next action") {
ReadAction::GetRecord { data: _, opcode } => {
print!("{},", opcode);
}
ReadAction::NeedMore(_) => {
let read = f
.read(reader.insert(blocksize))
.expect("failed to read from file");
reader.set_written(read);
}
}
}
}
33 changes: 27 additions & 6 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashMap;
use super::decompressor::Decompressor;
use crate::{
records::{op, ChunkHeader},
sans_io::decompressor::DecompressResult,
McapError, McapResult, MAGIC,
};
use binrw::BinReaderExt;
Expand Down Expand Up @@ -362,11 +363,12 @@ impl LinearReader {
&mut self.decompressed_content,
$remaining,
) {
Ok(None) => {
Ok(DecompressStatus::Filled) => {
&self.decompressed_content.data
[self.decompressed_content.start..self.decompressed_content.start + $n]
}
Ok(Some(n)) => return Some(Ok(ReadAction::NeedMore(n))),
Ok(DecompressStatus::Eof) => continue,
Ok(DecompressStatus::Need(n)) => return Some(Ok(ReadAction::NeedMore(n))),
Err(err) => return Some(Err(err)),
}
}};
Expand Down Expand Up @@ -698,6 +700,12 @@ fn get_decompressor(
}
}

enum DecompressStatus {
Eof,
Need(usize),
Filled,
}

// decompresses up to `n` bytes from `from` into `to`. Repeatedly calls `decompress` until
// either the input is exhausted or enough data has been written. Returns None if all required
// data has been decompressed, or Some(need) if more bytes need to be read from the input.
Expand All @@ -707,27 +715,40 @@ fn decompress_inner(
src_buf: &mut RwBuf,
dest_buf: &mut RwBuf,
compressed_remaining: &mut u64,
) -> McapResult<Option<usize>> {
) -> McapResult<DecompressStatus> {
if dest_buf.len() >= n {
return Ok(None);
return Ok(DecompressStatus::Filled);
}

dest_buf.data.resize(dest_buf.start + n, 0);

loop {
let need = decompressor.next_read_size();
let have = src_buf.len();

if need > have {
return Ok(Some(need - have));
return Ok(DecompressStatus::Need(need - have));
}

let dst = &mut dest_buf.data[dest_buf.end..];

if dst.is_empty() {
return Ok(None);
return Ok(DecompressStatus::Filled);
}

let src_len = std::cmp::min(have, clamp_to_usize(*compressed_remaining));

let src = &src_buf.data[src_buf.start..src_buf.start + src_len];

let res = decompressor.decompress(src, dst)?;
src_buf.mark_read(res.consumed);
dest_buf.end += res.wrote;

*compressed_remaining -= res.consumed as u64;

if *compressed_remaining == 0 {
return Ok(DecompressStatus::Eof);
}
}
}

Expand Down

0 comments on commit 37fb262

Please sign in to comment.