Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow nested parsers on MessageProducer level #2077

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion application/apps/indexer/addons/dlt-tools/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn scan_dlt_ft(
let reader = BufReader::new(&input);
let source = BinaryByteSource::new(reader);
let parser = DltParser::new(filter.map(|f| f.into()), None, None, with_storage_header);
let mut producer = MessageProducer::new(parser, source, None);
let mut producer = MessageProducer::new(parser, source, Vec::new(), None);
let stream = producer.as_stream();
pin_mut!(stream);

Expand Down
2 changes: 1 addition & 1 deletion application/apps/indexer/indexer_cli/src/interactive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub(crate) async fn handle_interactive_session(input: Option<PathBuf>) {
static RECEIVER: &str = "127.0.0.1:5000";
let udp_source = UdpSource::new(RECEIVER, vec![]).await.unwrap();
let dlt_parser = DltParser::new(None, None, None, false);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, None);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, udp_source, Vec::new(), None);
let msg_stream = dlt_msg_producer.as_stream();
pin_mut!(msg_stream);
loop {
Expand Down
14 changes: 8 additions & 6 deletions application/apps/indexer/indexer_cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ pub async fn main() -> Result<()> {
let dlt_parser = DltParser::new(None, None, None, true);
let reader = BufReader::new(&in_file);
let source = BinaryByteSource::new(reader);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, Vec::new(), None);
let cancel = CancellationToken::new();
export_raw(
Box::pin(dlt_msg_producer.as_stream()),
Expand Down Expand Up @@ -1371,7 +1371,7 @@ async fn count_dlt_messages(input: &Path) -> Result<u64, DltParseError> {

let source = BinaryByteSource::new(second_reader);

let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, Vec::new(), None);
let msg_stream = dlt_msg_producer.as_stream();
Ok(msg_stream.count().await as u64)
} else {
Expand All @@ -1388,7 +1388,7 @@ async fn detect_messages_type(input: &Path) -> Result<bool, DltParseError> {
let buf_reader = BufReader::new(fs::File::open(input)?);
let source = BinaryByteSource::new(buf_reader);
let dlt_parser = DltRangeParser::new();
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None);
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, Vec::new(), None);
let msg_stream = dlt_msg_producer.as_stream();
pin_mut!(msg_stream);
let mut item_count = 0usize;
Expand Down Expand Up @@ -1434,7 +1434,8 @@ async fn detect_messages_type(input: &Path) -> Result<bool, DltParseError> {
let some_parser = SomeipParser::new();
match PcapngByteSource::new(fs::File::open(input)?) {
Ok(source) => {
let mut some_msg_producer = MessageProducer::new(some_parser, source, None);
let mut some_msg_producer =
MessageProducer::new(some_parser, source, Vec::new(), None);
let msg_stream = some_msg_producer.as_stream();
pin_mut!(msg_stream);
let mut item_count = 0usize;
Expand Down Expand Up @@ -1473,7 +1474,8 @@ async fn detect_messages_type(input: &Path) -> Result<bool, DltParseError> {
// let buf_reader = BufReader::new(fs::File::open(&input)?);
match PcapngByteSource::new(fs::File::open(input)?) {
Ok(source) => {
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, None);
let mut dlt_msg_producer =
MessageProducer::new(dlt_parser, source, Vec::new(), None);
let msg_stream = dlt_msg_producer.as_stream();
pin_mut!(msg_stream);
let mut item_count = 0usize;
Expand Down Expand Up @@ -1525,7 +1527,7 @@ async fn detect_messages_type(input: &Path) -> Result<bool, DltParseError> {
let txt_parser = StringTokenizer {};
let buf_reader = BufReader::new(fs::File::open(input)?);
let source = BinaryByteSource::new(buf_reader);
let mut txt_msg_producer = MessageProducer::new(txt_parser, source, None);
let mut txt_msg_producer = MessageProducer::new(txt_parser, source, Vec::new(), None);
let msg_stream = txt_msg_producer.as_stream();
pin_mut!(msg_stream);
let mut item_count = 0usize;
Expand Down
13 changes: 11 additions & 2 deletions application/apps/indexer/parsers/src/dlt/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod attachment;
pub mod fmt;

use crate::{dlt::fmt::FormattableMessage, Error, LogMessage, ParseYield, Parser};
use crate::{dlt::fmt::FormattableMessage, Error, LogMessage, ParseYield, Parser, ParserAlias};
use byteorder::{BigEndian, WriteBytesExt};
pub use dlt_core::{
dlt::LogLevel,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl LogMessage for RawMessage {
}
}

#[derive(Default)]
#[derive(Default, Debug)]
pub struct DltParser<'m> {
pub filter_config: Option<ProcessedDltFilterConfig>,
pub fibex_metadata: Option<&'m FibexMetadata>,
Expand Down Expand Up @@ -120,6 +120,7 @@ impl<'m> Parser<FormattableMessage<'m>> for DltParser<'m> {
&mut self,
input: &'b [u8],
timestamp: Option<u64>,
mut nested: impl FnMut(&'b [u8], ParserAlias) -> Option<String>,
) -> Result<(&'b [u8], Option<ParseYield<FormattableMessage<'m>>>), Error> {
match dlt_message(input, self.filter_config.as_ref(), self.with_storage_header)
.map_err(|e| Error::Parse(format!("{e}")))?
Expand All @@ -145,6 +146,12 @@ impl<'m> Parser<FormattableMessage<'m>> for DltParser<'m> {
options: self.fmt_options,
};
self.offset += input.len() - rest.len();

debug_assert!({
let _ = nested(&[0, 0, 0, 0, 0], ParserAlias::SomeIp);
true
});

Ok((
rest,
if let Some(attachment) = attachment {
Expand All @@ -163,6 +170,7 @@ impl Parser<RangeMessage> for DltRangeParser {
&mut self,
input: &'b [u8],
_timestamp: Option<u64>,
_nested: impl FnMut(&'b [u8], ParserAlias) -> Option<String>,
) -> Result<(&'b [u8], Option<ParseYield<RangeMessage>>), Error> {
let (rest, consumed) = dlt_consume_msg(input).map_err(|e| Error::Parse(format!("{e}")))?;
let msg = consumed.map(|c| {
Expand All @@ -183,6 +191,7 @@ impl Parser<RawMessage> for DltRawParser {
&mut self,
input: &'b [u8],
_timestamp: Option<u64>,
_nested: impl FnMut(&'b [u8], ParserAlias) -> Option<String>,
) -> Result<(&'b [u8], Option<ParseYield<RawMessage>>), Error> {
let (rest, consumed) = dlt_consume_msg(input).map_err(|e| Error::Parse(format!("{e}")))?;
let msg = consumed.map(|c| RawMessage {
Expand Down
10 changes: 10 additions & 0 deletions application/apps/indexer/parsers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,15 @@ use thiserror::Error;

extern crate log;

#[derive(Debug)]
pub enum ParserKind {
SomeIp(someip::SomeipParser),
}

pub enum ParserAlias {
SomeIp,
}

#[derive(Error, Debug)]
pub enum Error {
#[error("Parse error: {0}")]
Expand Down Expand Up @@ -44,6 +53,7 @@ pub trait Parser<T> {
&mut self,
input: &'a [u8],
timestamp: Option<u64>,
nested: impl FnMut(&'a [u8], ParserAlias) -> Option<String>,
) -> Result<(&'a [u8], Option<ParseYield<T>>), Error>;
}

Expand Down
52 changes: 43 additions & 9 deletions application/apps/indexer/parsers/src/someip.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Error, LogMessage, ParseYield, Parser};
use crate::{Error, LogMessage, ParseYield, Parser, ParserAlias};
use std::{borrow::Cow, fmt, fmt::Display, io::Write, path::PathBuf};

use someip_messages::*;
Expand All @@ -12,6 +12,7 @@ use log::{debug, error};
use serde::Serialize;

/// A parser for SOME/IP log messages.
#[derive(Debug)]
pub struct SomeipParser {
model: Option<FibexModel>,
}
Expand Down Expand Up @@ -53,6 +54,7 @@ impl Parser<SomeipLogMessage> for SomeipParser {
&mut self,
input: &'a [u8],
timestamp: Option<u64>,
_nested: impl FnMut(&'a [u8], ParserAlias) -> Option<String>,
) -> Result<(&'a [u8], Option<ParseYield<SomeipLogMessage>>), Error> {
let time = timestamp.unwrap_or(0);
match Message::from_slice(input) {
Expand Down Expand Up @@ -407,7 +409,11 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand All @@ -428,7 +434,11 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand All @@ -449,7 +459,11 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand All @@ -474,7 +488,11 @@ mod test {
let model = test_model();

let mut parser = SomeipParser { model: Some(model) };
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand All @@ -498,7 +516,11 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand All @@ -524,7 +546,11 @@ mod test {
let model = test_model();

let mut parser = SomeipParser { model: Some(model) };
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand All @@ -550,7 +576,11 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand Down Expand Up @@ -592,7 +622,11 @@ mod test {
];

let mut parser = SomeipParser::new();
let (output, message) = parser.parse(input, None).unwrap();
let (output, message) = parser
.parse(input, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();

assert!(output.is_empty());

Expand Down
23 changes: 19 additions & 4 deletions application/apps/indexer/parsers/src/text.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{Error, LogMessage, ParseYield, Parser};
use crate::{Error, LogMessage, ParseYield, Parser, ParserAlias};
use serde::Serialize;
use std::{fmt, io::Write};

Expand Down Expand Up @@ -31,6 +31,7 @@ where
&mut self,
input: &'b [u8],
_timestamp: Option<u64>,
_nested: impl FnMut(&'b [u8], ParserAlias) -> Option<String>,
) -> Result<(&'b [u8], Option<ParseYield<StringMessage>>), Error> {
// TODO: support non-utf8 encodings
use memchr::memchr;
Expand Down Expand Up @@ -58,18 +59,32 @@ where
fn test_string_tokenizer() {
let mut parser = StringTokenizer {};
let content = b"hello\nworld\n";
let (rest_1, first_msg) = parser.parse(content, None).unwrap();
let (rest_1, first_msg) = parser
.parse(
content,
None,
|_: &[u8], _: ParserAlias| -> Option<String> { None },
)
.unwrap();
match first_msg {
Some(ParseYield::Message(StringMessage { content })) if content.eq("hello") => {}
_ => panic!("First message did not match"),
}
println!("rest_1 = {:?}", String::from_utf8_lossy(rest_1));
let (rest_2, second_msg) = parser.parse(rest_1, None).unwrap();
let (rest_2, second_msg) = parser
.parse(rest_1, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();
match second_msg {
Some(ParseYield::Message(StringMessage { content })) if content.eq("world") => {}
_ => panic!("Second message did not match"),
}
let (rest_3, third_msg) = parser.parse(rest_2, None).unwrap();
let (rest_3, third_msg) = parser
.parse(rest_2, None, |_: &[u8], _: ParserAlias| -> Option<String> {
None
})
.unwrap();
println!("rest_3 = {:?}", String::from_utf8_lossy(rest_3));
assert!(third_msg.is_none());
}
2 changes: 1 addition & 1 deletion application/apps/indexer/processor/src/text_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl TextFileSource {
file_part: &FilePart,
) -> Result<Vec<String>, GrabError> {
Ok(String::from_utf8_lossy(read_buf)
.split(|c| c == '\n')
.split('\n')
.take(file_part.total_lines - file_part.lines_to_drop)
.skip(file_part.lines_to_skip)
.map(|s| s.to_string())
Expand Down
6 changes: 3 additions & 3 deletions application/apps/indexer/session/src/handlers/export_raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ async fn export<S: ByteSource>(
} else {
SomeipParser::new()
};
let mut producer = MessageProducer::new(parser, source, None);
let mut producer = MessageProducer::new(parser, source, Vec::new(), None);
export_runner(
Box::pin(producer.as_stream()),
dest,
Expand All @@ -163,7 +163,7 @@ async fn export<S: ByteSource>(
fmt_options.as_ref(),
settings.with_storage_header,
);
let mut producer = MessageProducer::new(parser, source, None);
let mut producer = MessageProducer::new(parser, source, Vec::new(), None);
export_runner(
Box::pin(producer.as_stream()),
dest,
Expand All @@ -175,7 +175,7 @@ async fn export<S: ByteSource>(
.await
}
ParserType::Text => {
let mut producer = MessageProducer::new(StringTokenizer {}, source, None);
let mut producer = MessageProducer::new(StringTokenizer {}, source, Vec::new(), None);
export_runner(
Box::pin(producer.as_stream()),
dest,
Expand Down
Loading
Loading