diff --git a/io-engine/local-randrw-0-verify.state b/io-engine/local-randrw-0-verify.state deleted file mode 100644 index 9d3146c5d..000000000 Binary files a/io-engine/local-randrw-0-verify.state and /dev/null differ diff --git a/io-engine/src/persistent_store.rs b/io-engine/src/persistent_store.rs index 7d56b52c6..8f5f334bc 100644 --- a/io-engine/src/persistent_store.rs +++ b/io-engine/src/persistent_store.rs @@ -9,9 +9,12 @@ use crate::{ core::Reactor, store::{ etcd::Etcd, - store_defs::{DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue}, + store_defs::{ + DeleteWait, GetWait, PutWait, Store, StoreError, StoreKey, StoreValue, TxnWait, + }, }, }; +use etcd_client::{Compare, TxnOp, TxnResponse}; use futures::channel::oneshot; use once_cell::sync::OnceCell; use parking_lot::Mutex; @@ -193,6 +196,26 @@ impl PersistentStore { })? } + /// Executes a transaction for the given key. + pub async fn txn( + key: &impl StoreKey, + cmps: Vec, + ops_success: Vec, + ops_failure: Option>, + ) -> Result { + let key_string = key.to_string(); + let rx = Self::execute_store_op(async move { + info!("Executing transaction for key {}.", key_string); + Self::backing_store() + .txn_kv(&key_string, cmps, ops_success, ops_failure) + .await + }); + + rx.await.context(TxnWait { + key: key.to_string(), + })? + } + /// Retrieves a value, with the given key, from the store. pub async fn get(key: &impl StoreKey) -> Result { let key_string = key.to_string(); diff --git a/io-engine/src/store/etcd.rs b/io-engine/src/store/etcd.rs index d470c2e03..6b4c27190 100644 --- a/io-engine/src/store/etcd.rs +++ b/io-engine/src/store/etcd.rs @@ -2,10 +2,10 @@ use crate::store::store_defs::{ Connect, Delete, DeserialiseValue, Get, Put, SerialiseValue, Store, StoreError, - StoreError::MissingEntry, StoreKey, StoreValue, ValueString, + StoreError::MissingEntry, StoreKey, StoreValue, Txn as TxnErr, ValueString, }; use async_trait::async_trait; -use etcd_client::Client; +use etcd_client::{Client, Compare, Txn, TxnOp, TxnResponse}; use serde_json::Value; use snafu::ResultExt; @@ -49,6 +49,25 @@ impl Store for Etcd { Ok(()) } + /// Executes a transaction for the given key. If the compares succeed, then + /// ops_success will be executed atomically, otherwise ops_failure will be + /// executed atomically. + async fn txn_kv( + &mut self, + key: &K, + cmps: Vec, + ops_success: Vec, + ops_failure: Option>, + ) -> Result { + let fops = ops_failure.map_or(vec![], |v| v); + self.0 + .txn(Txn::new().when(cmps).and_then(ops_success).or_else(fops)) + .await + .context(TxnErr { + key: key.to_string(), + }) + } + /// 'Get' the value for the given key from etcd. async fn get_kv(&mut self, key: &K) -> Result { let resp = self.0.get(key.to_string(), None).await.context(Get { diff --git a/io-engine/src/store/store_defs.rs b/io-engine/src/store/store_defs.rs index fc0bc8d68..57826d763 100644 --- a/io-engine/src/store/store_defs.rs +++ b/io-engine/src/store/store_defs.rs @@ -1,7 +1,7 @@ //! Definition of a trait for a key-value store together with its error codes. use async_trait::async_trait; -use etcd_client::Error; +use etcd_client::{Compare, Error, TxnOp, TxnResponse}; use serde_json::{Error as SerdeError, Value}; use snafu::Snafu; @@ -56,6 +56,18 @@ pub enum StoreError { key: String, source: futures::channel::oneshot::Canceled, }, + /// Failed to complete a 'transaction'. + #[snafu(display("Failed to do 'transaction' for key {}. Error {}", key, source))] + Txn { key: String, source: Error }, + /// Failed to wait for 'transaction' operations. + #[snafu(display( + "Failed to wait for 'transaction' operations to complete for key {}.", + key, + ))] + TxnWait { + key: String, + source: futures::channel::oneshot::Canceled, + }, /// Failed to 'watch' an entry in the store. #[snafu(display("Failed to 'watch' entry with key {}. Error {}", key, source))] Watch { key: String, source: Error }, @@ -93,6 +105,14 @@ pub trait Store: Sync + Send + Clone { value: &V, ) -> Result<(), StoreError>; + async fn txn_kv( + &mut self, + key: &K, + cmps: Vec, + ops_success: Vec, + ops_failure: Option>, + ) -> Result; + /// Get an entry from the store. async fn get_kv(&mut self, key: &K) -> Result; diff --git a/io-engine/tests/persistence.rs b/io-engine/tests/persistence.rs index c8b0c7e57..43a855973 100644 --- a/io-engine/tests/persistence.rs +++ b/io-engine/tests/persistence.rs @@ -8,17 +8,22 @@ use common::compose::{ }, GrpcConnect, RpcHandle, }, - Binary, Builder, ComposeTest, ContainerSpec, + Binary, Builder, ComposeTest, ContainerSpec, MayastorTest, }; -use etcd_client::Client; - -use io_engine::bdev::nexus::{ChildInfo, NexusInfo}; +use etcd_client::{Client, Compare, CompareOp, TxnOp}; +use io_engine::{ + bdev::nexus::{ChildInfo, NexusInfo}, + core::MayastorCliArgs, + persistent_store::{PersistentStore, PersistentStoreBuilder}, +}; use std::{convert::TryFrom, thread::sleep, time::Duration}; use url::Url; pub mod common; +use once_cell::sync::OnceCell; +static MAYASTOR: OnceCell = OnceCell::new(); static ETCD_ENDPOINT: &str = "0.0.0.0:2379"; static CHILD1_UUID: &str = "d61b2fdf-1be8-457a-a481-70a42d0a2223"; static CHILD2_UUID: &str = "094ae8c6-46aa-4139-b4f2-550d39645db3"; @@ -519,3 +524,83 @@ pub(crate) fn uuid(uri: &str) -> String { } panic!("URI does not contain a uuid query parameter."); } + +// This test does a successful etcd transaction that upon successful CompareOp, modifies +// an existing key and adds a new key. Then does another transaction which is +// fails the CompareOp and hence that transaction is expected to fail. +#[tokio::test] +async fn pstor_txn_api() { + let dummy_key1 = "dummy_key_1".to_string(); + let dummy_key2 = "dummy_key_2".to_string(); + let pre_txn_value_k1 = "pre_txn_value_key1".to_string(); + let post_txn_value_k1 = "post_txn_value_key1".to_string(); + let post_txn_value_k2 = "post_txn_value_key2".to_string(); + common::composer_init(); + + let _test = Builder::new() + .name("etcd_txn_test") + .add_container_spec( + ContainerSpec::from_binary( + "etcd", + Binary::from_path(env!("ETCD_BIN")).with_args(vec![ + "--data-dir", + "/tmp/etcd-data", + "--advertise-client-urls", + "http://0.0.0.0:2379", + "--listen-client-urls", + "http://0.0.0.0:2379", + ]), + ) + .with_portmap("2379", "2379") + .with_portmap("2380", "2380"), + ) + .build() + .await + .unwrap(); + + PersistentStoreBuilder::new() + .with_endpoint(ETCD_ENDPOINT) + .connect() + .await; + + // create the mayastor test instance + let ms = MayastorTest::new(MayastorCliArgs { + log_components: vec!["all".into()], + reactor_mask: "0x3".to_string(), + no_pci: true, + ..Default::default() + }); + + let _ms = MAYASTOR.get_or_init(|| ms); + MAYASTOR + .get() + .unwrap() + .spawn(async move { + let _ = PersistentStore::put(&dummy_key1, &pre_txn_value_k1).await; + let expect = serde_json::to_vec(&pre_txn_value_k1).unwrap(); + + // Transaction - expected to succeed. + let cmp = vec![Compare::value( + dummy_key1.to_string(), + CompareOp::Equal, + expect, + )]; + let ops = vec![ + TxnOp::put(dummy_key1.to_string(), post_txn_value_k1, None), + TxnOp::put(dummy_key2.to_string(), post_txn_value_k2, None), + ]; + let txn_resp = PersistentStore::txn(&dummy_key1, cmp.clone(), ops, None) + .await + .unwrap(); + assert!(txn_resp.succeeded()); + + // A new transaction - expected to fail. Execute fops upon compare failure. + let ops = vec![TxnOp::delete(dummy_key1.to_string(), None)]; + let fops = vec![TxnOp::delete(dummy_key2.to_string(), None)]; + let txn_resp = PersistentStore::txn(&dummy_key1, cmp, ops, Some(fops)) + .await + .unwrap(); + assert!(!txn_resp.succeeded()); + }) + .await; +}