Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async implementation of the AC automaton with futures crate (fully optional) #132

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ bench_large/huge
BREADCRUMBS
/tmp
/aho-corasick-debug/Cargo.lock
.vscode
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ name = "aho_corasick"
[features]
default = ["std", "perf-literal"]
std = ["memchr?/std"]
async = ["dep:futures", "dep:pin-project-lite"]

# Enables prefilter optimizations that depend on external crates.
perf-literal = ["dep:memchr"]
Expand Down Expand Up @@ -50,6 +51,8 @@ logging = ["dep:log"]
[dependencies]
log = { version = "0.4.17", optional = true }
memchr = { version = "2.4.0", default-features = false, optional = true }
futures = { version = "0.3.28", optional = true }
pin-project-lite = { version = "0.2.13", optional = true }

[dev-dependencies]
doc-comment = "0.3.3"
Expand Down
160 changes: 159 additions & 1 deletion src/ahocorasick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ use crate::{
},
};

#[cfg(all(feature = "async", feature = "std"))]
use crate::r#async::{
reader::AhoCorasickAsyncReader, writer::AhoCorasickAsyncWriter,
};

/// An automaton for searching multiple strings in linear time.
///
/// The `AhoCorasick` type supports a few basic ways of constructing an
Expand Down Expand Up @@ -1841,6 +1846,159 @@ impl AhoCorasick {
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
self.aut.try_stream_replace_all_with(rdr, wtr, replace_with)
}

/// Obtain AhoCorasickAsyncReader wrapping an original AsyncRead source
/// Reading from this new reader will yield chunks with patterns already replaced
/// Poll will only return Ok(0) if the poll to the original source also returned 0 bytes.
///
/// # Example: basic usage
///
/// ```
/// use aho_corasick::AhoCorasick;
/// use futures::AsyncReadExt;
///
/// let patterns = &["fox", "brown", "quick"];
/// let replacements = &["bear", "white", "slow"];
/// let haystack = futures::io::Cursor::new("The quick brown fox.");
///
/// let ac = AhoCorasick::new(patterns).unwrap();
/// let mut ac_async_reader = ac.async_reader(haystack, replacements).unwrap();
/// let mut result = String::new();
/// futures::executor::block_on(async {
/// ac_async_reader.read_to_string(&mut result).await.unwrap();
/// });
/// assert_eq!(&result, "The slow white bear.");
/// ```
#[cfg(all(feature = "async", feature = "std"))]
pub fn async_reader<'a, R, B>(
&self,
source: R,
replace_with: &'a [B],
) -> Result<AhoCorasickAsyncReader<'a, R, B>, std::io::Error>
where
R: futures::AsyncRead,
B: AsRef<[u8]> + 'a,
{
assert_eq!(
replace_with.len(),
self.patterns_len(),
"async_reader requires a replacement for every pattern \
in the automaton",
);
enforce_anchored_consistency(self.start_kind, Anchored::No)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

AhoCorasickAsyncReader::new(
Arc::clone(&self.aut),
source,
replace_with,
)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}

/// Obtain AhoCorasickAsyncWriter wrapping an original AsyncWrite sink.
/// Writing to this new writer will perform the replacements before sending the bytes to your sink
///
/// # Example: basic usage
///
/// ```
/// use aho_corasick::AhoCorasick;
/// use futures::{AsyncReadExt, AsyncWriteExt};
///
/// let patterns = &["fox", "brown", "quick"];
/// let replacements = &["bear", "white", "slow"];
/// let mut haystack = futures::io::Cursor::new("The quick brown fox.");
///
/// let ac = AhoCorasick::new(patterns).unwrap();
/// let mut result: futures::io::Cursor<Vec<u8>> = futures::io::Cursor::new(Vec::new());
/// let mut ac_async_writer = ac.async_writer(&mut result, replacements).unwrap();
/// futures::executor::block_on(async {
/// let mut buf = [0u8; 10];
/// loop {
/// let bytes_read = haystack.read(&mut buf).await.unwrap();
/// if bytes_read > 0 {
/// ac_async_writer.write(&buf[..bytes_read]).await.unwrap();
/// } else {
/// ac_async_writer.close().await.unwrap();
/// break;
/// }
/// }
/// });
/// assert_eq!(&String::from_utf8(result.get_ref().to_vec()).unwrap(), "The slow white bear.");
/// ```
#[cfg(all(feature = "async", feature = "std"))]
pub fn async_writer<'a, W, B>(
&self,
sink: W,
replace_with: &'a [B],
) -> Result<AhoCorasickAsyncWriter<'a, W, B>, std::io::Error>
where
W: futures::AsyncWrite,
B: AsRef<[u8]> + 'a,
{
assert_eq!(
replace_with.len(),
self.patterns_len(),
"async_writer requires a replacement for every pattern \
in the automaton",
);
enforce_anchored_consistency(self.start_kind, Anchored::No)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

AhoCorasickAsyncWriter::new(Arc::clone(&self.aut), sink, replace_with)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
}

/// Helper method to read everything from the given AsyncRead, perform the replacements
/// and write the chunks to the given AsyncWrite. Specify a buffer size which suits your case the best
///
/// This method does nothing more than using an async_writer, an provide a convenient replacement loop
/// It could very well be manually implemented by the consumer, but provides an async alternative to
/// the existing try_stream_replace_all
#[cfg(all(feature = "async", feature = "std"))]
pub async fn try_async_stream_replace_all<R, W, B>(
&self,
reader: R,
writer: W,
replace_with: &[B],
buffer_size: usize,
) -> Result<(), std::io::Error>
where
R: futures::AsyncRead,
W: futures::AsyncWrite,
B: AsRef<[u8]>,
{
assert_eq!(
replace_with.len(),
self.patterns_len(),
"try_async_stream_replace_all requires a replacement for every pattern \
in the automaton",
);
enforce_anchored_consistency(self.start_kind, Anchored::No)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;

let mut buffer = alloc::vec![b'\0'; buffer_size];
let ac_writer = self.async_writer(writer, replace_with)?;

let mut pinned_reader = alloc::boxed::Box::pin(reader);
let mut pinned_writer = alloc::boxed::Box::pin(ac_writer);
loop {
let bytes_read =
futures::AsyncReadExt::read(&mut pinned_reader, &mut buffer)
.await?;
if bytes_read == 0 {
futures::AsyncWriteExt::close(&mut pinned_writer).await?;
break;
} else {
futures::AsyncWriteExt::write(
&mut pinned_writer,
&buffer[..bytes_read],
)
.await?;
}
}
Ok(())
}
}

/// Routines for querying information about the Aho-Corasick automaton.
Expand Down Expand Up @@ -2640,7 +2798,7 @@ pub enum AhoCorasickKind {
/// there is no borrowed data. Without these, the main `AhoCorasick` type would
/// not be able to meaningfully impl `Debug` or the marker traits without also
/// requiring that all impls of `Automaton` do so, which would be not great.
trait AcAutomaton:
pub(crate) trait AcAutomaton:
Automaton + Debug + Send + Sync + UnwindSafe + RefUnwindSafe + 'static
{
}
Expand Down
5 changes: 5 additions & 0 deletions src/async/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub(crate) mod reader;
pub(crate) mod writer;

#[cfg(test)]
mod tests;
Loading