Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use futures 0.3 and tokio #4

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ license = "MIT"
travis-ci = {repository = "frugalos/ecpool"}

[dependencies]
fibers = "0.1"
fibers_tasque = "0.1"
futures = "0.1"
trackable = "0.2"
futures = "0.3"
tokio = { version = "1.0", features = ["sync"]}
tokio_tasque = { version = "0.1", path = "../tokio_tasque" }
trackable = "1.2"

[target.'cfg(unix)'.dependencies]
liberasurecode = "1.0.1"

[dev-dependencies]
clap = "2"
fibers_global = "0.1"
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Basic usage:
```rust
use ecpool::replica::ReplicaCoder;
use ecpool::{ErrorKind, ErasureCoderPool};
use futures::executor::block_on;
use std::num::NonZeroUsize;
use std::result::Result;
use trackable::error::{Failure, Failed};
Expand All @@ -51,30 +52,30 @@ let coder = ErasureCoderPool::new(ReplicaCoder::new(data_fragments, parity_fragm

// Encodes
let data = vec![0, 1, 2, 3];
let encoded = fibers_global::execute(coder.encode(data.clone()))?;
let encoded = block_on(coder.encode(data.clone()))?;

// Decodes
assert_eq!(
Some(&data),
fibers_global::execute(coder.decode(encoded[0..].to_vec()))
block_on(coder.decode(encoded[0..].to_vec()))
.as_ref()
.ok()
);
assert_eq!(
Some(&data),
fibers_global::execute(coder.decode(encoded[1..].to_vec()))
block_on(coder.decode(encoded[1..].to_vec()))
.as_ref()
.ok()
);
assert_eq!(
Some(&data),
fibers_global::execute(coder.decode(encoded[2..].to_vec()))
block_on(coder.decode(encoded[2..].to_vec()))
.as_ref()
.ok()
);
assert_eq!(
Err(ErrorKind::InvalidInput),
fibers_global::execute(coder.decode(encoded[3..].to_vec())).map_err(|e| *e.kind())
block_on(coder.decode(encoded[3..].to_vec())).map_err(|e| *e.kind())
);
```

Expand Down
19 changes: 8 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
//! Basic usage:
//! ```
//! # extern crate ecpool;
//! # extern crate fibers_global;
//! # extern crate futures;
//! # extern crate trackable;
//! use ecpool::replica::ReplicaCoder;
//! use ecpool::{ErrorKind, ErasureCoderPool};
//! use futures::executor::block_on;
//! use std::num::NonZeroUsize;
//! use std::result::Result;
//! use trackable::error::{Failure, Failed};
Expand All @@ -43,30 +44,30 @@
//!
//! // Encodes
//! let data = vec![0, 1, 2, 3];
//! let encoded = fibers_global::execute(coder.encode(data.clone()))?;
//! let encoded = block_on(coder.encode(data.clone()))?;
//!
//! // Decodes
//! assert_eq!(
//! Some(&data),
//! fibers_global::execute(coder.decode(encoded[0..].to_vec()))
//! block_on(coder.decode(encoded[0..].to_vec()))
//! .as_ref()
//! .ok()
//! );
//! assert_eq!(
//! Some(&data),
//! fibers_global::execute(coder.decode(encoded[1..].to_vec()))
//! block_on(coder.decode(encoded[1..].to_vec()))
//! .as_ref()
//! .ok()
//! );
//! assert_eq!(
//! Some(&data),
//! fibers_global::execute(coder.decode(encoded[2..].to_vec()))
//! block_on(coder.decode(encoded[2..].to_vec()))
//! .as_ref()
//! .ok()
//! );
//! assert_eq!(
//! Err(ErrorKind::InvalidInput),
//! fibers_global::execute(coder.decode(encoded[3..].to_vec())).map_err(|e| *e.kind())
//! block_on(coder.decode(encoded[3..].to_vec())).map_err(|e| *e.kind())
//! );
//! # Ok(())
//! # }
Expand All @@ -78,12 +79,8 @@
//! [openstack/liberasurecode]: https://github.com/openstack/liberasurecode
//! [`LibErasureCoder`]: ./liberasurecode/struct.LibErasureCoder.html
//! [`ReplicaCoder`]: ./replica/struct.ReplicaCoder.html
#![warn(missing_docs)]
extern crate fibers;
#[cfg(test)]
extern crate fibers_global;
extern crate fibers_tasque;
extern crate futures;
extern crate tokio_tasque;
#[macro_use]
extern crate trackable;

Expand Down
57 changes: 26 additions & 31 deletions src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use fibers_tasque::{AsyncCall, DefaultCpuTaskQueue, TaskQueueExt};
use futures::{Async, Future, Poll};
use futures::{Future, FutureExt};
use std::cell::RefCell;
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio_tasque::{AsyncCall, DefaultCpuTaskQueue, TaskQueueExt};
use trackable::error::ErrorKindExt;

use crate::{BuildCoder, ErasureCode, Error, ErrorKind, Fragment, FragmentBuf, Result};
use crate::{BuildCoder, ErasureCode, ErrorKind, Fragment, FragmentBuf, Result};

thread_local! {
static ERASURE_CODERS: RefCell<HashMap<String, Box<dyn ErasureCode>>> =
Expand All @@ -13,10 +15,10 @@ thread_local! {

/// Thread pool for encoding and decoding data by using an [`ErasureCode`] implementation.
///
/// Internally, this uses [`fibers_tasque::DefaultCpuTaskQueue`] for realizing thread pool functionality.
/// Internally, this uses [`tokio_tasque::DefaultCpuTaskQueue`] for realizing thread pool functionality.
///
/// [`ErasureCode`]: ./trait.ErasureCode.html
/// [`fibers_tasque::DefaultCpuTaskQueue`]: https://docs.rs/fibers_tasque/0.1/fibers_tasque/struct.DefaultCpuTaskQueue.html
/// [`tokio_tasque::DefaultCpuTaskQueue`]: https://docs.rs/tokio_tasque/0.1/tokio_tasque/struct.DefaultCpuTaskQueue.html
#[derive(Debug, Clone)]
pub struct ErasureCoderPool<B> {
builder: B,
Expand All @@ -33,7 +35,7 @@ impl<B: BuildCoder> ErasureCoderPool<B> {
///
/// The result vector contains `N` data fragments and `M` parity fragments
/// (where `N = self.data_fragments()` and `M = self.parity_fragments()`).
pub fn encode<T>(&self, data: T) -> impl Future<Item = Vec<FragmentBuf>, Error = Error>
pub fn encode<T>(&self, data: T) -> impl Future<Output = Result<Vec<FragmentBuf>>>
where
T: AsRef<[u8]> + Send + 'static,
{
Expand All @@ -48,7 +50,7 @@ impl<B: BuildCoder> ErasureCoderPool<B> {
/// The decoding process will be executed on a thread in the pool.
///
/// Note whether the correctness of the result data has been validated depends on the implementations.
pub fn decode<T>(&self, fragments: Vec<T>) -> impl Future<Item = Vec<u8>, Error = Error>
pub fn decode<T>(&self, fragments: Vec<T>) -> impl Future<Output = Result<Vec<u8>>>
where
T: AsRef<Fragment> + Send + 'static,
{
Expand All @@ -67,7 +69,7 @@ impl<B: BuildCoder> ErasureCoderPool<B> {
&self,
index: usize,
fragments: Vec<T>,
) -> impl Future<Item = Vec<u8>, Error = Error>
) -> impl Future<Output = Result<Vec<u8>>>
where
T: AsRef<Fragment> + Send + 'static,
{
Expand Down Expand Up @@ -97,16 +99,12 @@ impl<B: BuildCoder> ErasureCoderPool<B> {

struct LazyResult<T>(AsyncCall<Result<T>>);
impl<T> Future for LazyResult<T> {
type Item = T;
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if let Async::Ready(result) = track!(self.0.poll().map_err(|e| ErrorKind::Other.cause(e)))?
{
let value = result?;
Ok(Async::Ready(value))
} else {
Ok(Async::NotReady)
}
type Output = Result<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.0.poll_unpin(cx).map(|result| match result {
Ok(result) => track!(result),
Err(e) => track!(Err(ErrorKind::Other.cause(e).into())),
})
}
}

Expand All @@ -120,36 +118,33 @@ mod tests {
use crate::replica::ReplicaCoder;
use crate::ErrorKind;

#[test]
fn pool_works() -> Result<(), MainError> {
#[tokio::test(flavor = "multi_thread")]
async fn pool_works() -> Result<(), MainError> {
let data_fragments = track_assert_some!(NonZeroUsize::new(4), Failed);
let parity_fragments = track_assert_some!(NonZeroUsize::new(2), Failed);

let coder = ErasureCoderPool::new(ReplicaCoder::new(data_fragments, parity_fragments));
let data = vec![0, 1, 2, 3];
let encoded = track!(fibers_global::execute(coder.encode(data.clone())))?;
let encoded = track!(coder.encode(data.clone()).await)?;

assert_eq!(
Some(&data),
fibers_global::execute(coder.decode(encoded[0..].to_vec()))
.as_ref()
.ok()
coder.decode(encoded[0..].to_vec()).await.as_ref().ok()
);
assert_eq!(
Some(&data),
fibers_global::execute(coder.decode(encoded[1..].to_vec()))
.as_ref()
.ok()
coder.decode(encoded[1..].to_vec()).await.as_ref().ok()
);
assert_eq!(
Some(&data),
fibers_global::execute(coder.decode(encoded[2..].to_vec()))
.as_ref()
.ok()
coder.decode(encoded[2..].to_vec()).await.as_ref().ok()
);
assert_eq!(
Err(ErrorKind::InvalidInput),
fibers_global::execute(coder.decode(encoded[3..].to_vec())).map_err(|e| *e.kind())
coder
.decode(encoded[3..].to_vec())
.await
.map_err(|e| *e.kind())
);

Ok(())
Expand Down