From 46b1df8b73561df4d455e5efe7d24a0c6800bf0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivan=20Sch=C3=BCtz?= Date: Tue, 30 Jun 2020 19:10:06 +0200 Subject: [PATCH] Migrate TCNs storage to Sqlite (#114) --- Cargo.toml | 1 - src/composition_root.rs | 21 ++-- src/errors.rs | 9 -- src/lib.rs | 23 ---- src/preferences.rs | 25 ++++- src/reports_updater.rs | 228 +++++++++++++++++++++++++++++----------- 6 files changed, 197 insertions(+), 110 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ed73382..25d9052 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ name = "coepi_core" crate-type = ["staticlib", "cdylib"] [dependencies] -persy = "0.9" once_cell = "1.3.1" cbindgen = "0.9.0" serde_json = "1.0" diff --git a/src/composition_root.rs b/src/composition_root.rs index 1369dea..6c08661 100644 --- a/src/composition_root.rs +++ b/src/composition_root.rs @@ -5,7 +5,7 @@ use crate::reports_updater::{ }; use crate::{ errors::ServicesError, - expect_log, init_persy, + expect_log, preferences::{Database, Preferences, PreferencesDao, PreferencesImpl}, reporting::{ memo::{MemoMapper, MemoMapperImpl}, @@ -65,11 +65,6 @@ pub static COMP_ROOT: OnceCell< pub fn bootstrap(db_path: &str) -> Result<(), ServicesError> { info!("Bootstrapping with db path: {:?}", db_path); - // TODO should be in a dependency - let persy_path = format!("{}/db.persy", db_path); - debug!("Persy path: {:?}", persy_path); - init_persy(persy_path).map_err(ServicesError::from)?; - let sqlite_path = format!("{}/db.sqlite", db_path); debug!("Sqlite path: {:?}", sqlite_path); @@ -98,7 +93,7 @@ pub fn dependencies() -> &'static CompositionRoot< >, >, >, - ObservedTcnProcessorImpl<'static, TcnDaoImpl>, + ObservedTcnProcessorImpl, MemoMapperImpl, TcnKeysImpl, > { @@ -129,7 +124,7 @@ fn create_comp_root( >, >, >, - ObservedTcnProcessorImpl<'static, TcnDaoImpl>, + ObservedTcnProcessorImpl, MemoMapperImpl, TcnKeysImpl, > { @@ -139,7 +134,7 @@ fn create_comp_root( let connection = expect_log!(connection_res, "Couldn't create database!"); let database = Arc::new(Database::new(connection)); - let preferences_dao = PreferencesDao::new(database); + let preferences_dao = PreferencesDao::new(database.clone()); let preferences = Arc::new(PreferencesImpl { dao: preferences_dao, }); @@ -156,13 +151,13 @@ fn create_comp_root( api, }; - let tcn_dao = &TcnDaoImpl {}; + let tcn_dao = Arc::new(TcnDaoImpl::new(database.clone())); CompositionRoot { api, reports_updater: ReportsUpdater { preferences: preferences.clone(), - tcn_dao, + tcn_dao: tcn_dao.clone(), tcn_matcher: TcnMatcherRayon {}, api, memo_mapper, @@ -173,7 +168,9 @@ fn create_comp_root( inputs_submitter: symptom_inputs_submitter, }, }, - observed_tcn_processor: ObservedTcnProcessorImpl { tcn_dao }, + observed_tcn_processor: ObservedTcnProcessorImpl { + tcn_dao: tcn_dao.clone(), + }, tcn_keys: tcn_keys.clone(), } } diff --git a/src/errors.rs b/src/errors.rs index 82d0fa1..0c6c4c7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -59,15 +59,6 @@ impl From for ServicesError { } } -impl From for ServicesError { - fn from(error: persy::PersyError) -> Self { - ServicesError::Error(Box::new(StdError::new( - ErrorKind::Other, - format!("{}", error), - ))) - } -} - impl From for ServicesError { fn from(error: hex::FromHexError) -> Self { ServicesError::Error(Box::new(StdError::new( diff --git a/src/lib.rs b/src/lib.rs index cd21164..50ef070 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,6 @@ #[macro_use] extern crate serde_big_array; use errors::Error; -use once_cell::sync::OnceCell; -use persy::{Config, Persy, ValueMode}; -use std::path::Path; mod composition_root; mod errors; mod networking; @@ -22,26 +19,6 @@ mod android; pub type Res = Result; -const CENS_BY_TS: &str = "cens by ts"; - -pub fn init_persy>(p: P) -> Res<()> { - let db = Persy::open_or_create_with(p, Config::new(), |db| { - let mut tx = db.begin()?; - tx.create_segment("tcn")?; - tx.create_index::(CENS_BY_TS, ValueMode::CLUSTER)?; - tx.prepare_commit()?.commit()?; - Ok(()) - })?; - DB.set(db).map_err(|_| DB_ALREADY_INIT)?; - Ok(()) -} - -const DB_ALREADY_INIT: &str = "DB failed to initalize"; -pub const DB_UNINIT: &str = "DB not initialized"; - -// TODO since we're using DI put this in a dependency, to be consistent -pub static DB: OnceCell = OnceCell::new(); - // TODO refactor these (byte_vec_to) convertions or better way? // TODO move to utils file or similar. Consider returning Result instead of panicking. diff --git a/src/preferences.rs b/src/preferences.rs index 15c5de8..484c6d2 100644 --- a/src/preferences.rs +++ b/src/preferences.rs @@ -96,9 +96,9 @@ impl PreferencesDao { fn create_table_if_not_exists(db: &Arc) { let res = db.execute_sql( "create table if not exists preferences( - key text primary key, - value text not null - )", + key text primary key, + value text not null + )", params![], ); expect_log!(res, "Couldn't create preferences table"); @@ -122,6 +122,25 @@ impl Database { conn.execute(sql, pars) } + pub fn query(&self, sql: &str, params: P, f: F) -> Result, rusqlite::Error> + where + P: IntoIterator, + P::Item: ToSql, + F: Fn(&Row<'_>) -> T, + { + let res = self.conn.lock(); + let conn = expect_log!(res, "Couldn't lock mutex"); + + let mut statement = conn.prepare(sql)?; + let mut rows = statement.query(params)?; + + let mut objs = Vec::new(); + while let Some(row) = rows.next().unwrap() { + objs.push(f(row)); + } + Ok(objs) + } + pub fn query_row(&self, sql: &str, params: P, f: F) -> Result where P: IntoIterator, diff --git a/src/reports_updater.rs b/src/reports_updater.rs index d2e0b73..69df60b 100644 --- a/src/reports_updater.rs +++ b/src/reports_updater.rs @@ -1,18 +1,20 @@ use crate::{ - byte_vec_to_16_byte_array, byte_vec_to_24_byte_array, byte_vec_to_8_byte_array, + byte_vec_to_16_byte_array, errors::{Error, ServicesError}, + expect_log, networking::{NetworkingError, TcnApi}, - preferences::Preferences, + preferences::{Database, Preferences}, reporting::{ memo::{Memo, MemoMapper}, public_report::PublicReport, }, - reports_interval, DB, DB_UNINIT, + reports_interval, }; use chrono::Utc; use log::*; use rayon::prelude::*; use reports_interval::{ReportsInterval, UnixTime}; +use rusqlite::{params, Row, NO_PARAMS}; use serde::Serialize; use std::collections::HashMap; use std::{io::Cursor, sync::Arc, time::Instant}; @@ -100,38 +102,18 @@ pub struct ObservedTcn { time: UnixTime, } -impl ObservedTcn { - fn as_bytes(&self) -> [u8; 24] { - let tcn_bytes: [u8; 16] = self.tcn.0; - let time_bytes: [u8; 8] = self.time.value.as_bytes(); - let total_bytes = [&tcn_bytes[..], &time_bytes[..]].concat(); - byte_vec_to_24_byte_array(total_bytes) - } - - fn from_bytes(bytes: [u8; 24]) -> ObservedTcn { - let tcn_bytes: [u8; 16] = byte_vec_to_16_byte_array(bytes[0..16].to_vec()); - let time_bytes: [u8; 8] = byte_vec_to_8_byte_array(bytes[16..24].to_vec()); - let time = u64::from_le_bytes(time_bytes); - - ObservedTcn { - tcn: TemporaryContactNumber(tcn_bytes), - time: UnixTime { value: time }, - } - } -} - pub trait ObservedTcnProcessor { fn save(&self, tcn_str: &str) -> Result<(), ServicesError>; } -pub struct ObservedTcnProcessorImpl<'a, T> +pub struct ObservedTcnProcessorImpl where T: TcnDao, { - pub tcn_dao: &'a T, + pub tcn_dao: Arc, } -impl<'a, T> ObservedTcnProcessor for ObservedTcnProcessorImpl<'a, T> +impl ObservedTcnProcessor for ObservedTcnProcessorImpl where T: TcnDao, { @@ -145,42 +127,74 @@ where value: Utc::now().timestamp() as u64, }, }; - self.tcn_dao.save(observed_tcn) + + self.tcn_dao.save(&observed_tcn) } } pub trait TcnDao { fn all(&self) -> Result, ServicesError>; - fn save(&self, observed_tcn: ObservedTcn) -> Result<(), ServicesError>; + fn save(&self, observed_tcn: &ObservedTcn) -> Result<(), ServicesError>; } -pub struct TcnDaoImpl {} -impl TcnDao for TcnDaoImpl { - fn all(&self) -> Result, ServicesError> { - let mut out: Vec = Vec::new(); - - let items = DB - .get() - .ok_or(DB_UNINIT) - .map_err(Error::from)? - .scan("tcn") - .map_err(Error::from)?; - - for (_id, content) in items { - let byte_array: [u8; 24] = byte_vec_to_24_byte_array(content); - out.push(ObservedTcn::from_bytes(byte_array)); +pub struct TcnDaoImpl { + db: Arc, +} + +impl TcnDaoImpl { + fn create_table_if_not_exists(db: &Arc) { + // TODO use blob for tcn? https://docs.rs/rusqlite/0.23.1/rusqlite/blob/index.html + // TODO ideally FFI should send byte arrays too + let res = db.execute_sql( + "create table if not exists tcn( + tcn text not null, + contact_time integer not null + )", + params![], + ); + expect_log!(res, "Couldn't create tcn table"); + } + + fn to_tcn(row: &Row) -> ObservedTcn { + let tcn: Result = row.get(0); + let contact_time = row.get(1); + let tcn_value = expect_log!(tcn, "Invalid row: no TCN"); + let tcn_value_bytes_vec_res = hex::decode(tcn_value); + let tcn_value_bytes_vec = expect_log!(tcn_value_bytes_vec_res, "Invalid stored TCN format"); + let tcn_value_bytes = byte_vec_to_16_byte_array(tcn_value_bytes_vec); + let contact_time_value: i64 = expect_log!(contact_time, "Invalid row: no contact time"); + ObservedTcn { + tcn: TemporaryContactNumber(tcn_value_bytes), + time: UnixTime { + value: contact_time_value as u64, + }, } + } - Ok(out) + pub fn new(db: Arc) -> TcnDaoImpl { + Self::create_table_if_not_exists(&db); + TcnDaoImpl { db } + } +} + +impl TcnDao for TcnDaoImpl { + fn all(&self) -> Result, ServicesError> { + self.db + .query("select tcn, contact_time from tcn", NO_PARAMS, |row| { + Self::to_tcn(row) + }) + .map_err(ServicesError::from) } - fn save(&self, observed_tcn: ObservedTcn) -> Result<(), ServicesError> { - let db = DB.get().ok_or(DB_UNINIT)?; - let mut tx = db.begin()?; + fn save(&self, observed_tcn: &ObservedTcn) -> Result<(), ServicesError> { + let tcn_str = hex::encode(observed_tcn.tcn.0); - tx.insert_record("tcn", &observed_tcn.as_bytes())?; - // tx.put(CENS_BY_TS, ts, u128_of_tcn(tcn))?; - tx.prepare_commit()?.commit()?; + let res = self.db.execute_sql( + "insert or replace into tcn(tcn, contact_time) values(?1, ?2)", + // conversion to signed timestamp is safe, for obvious reasons. + params![tcn_str, observed_tcn.time.value as i64], + ); + expect_log!(res, "Couldn't insert tcn"); Ok(()) } } @@ -211,7 +225,7 @@ pub struct Alert { pub struct ReportsUpdater<'a, T: Preferences, U: TcnDao, V: TcnMatcher, W: TcnApi, X: MemoMapper> { pub preferences: Arc, - pub tcn_dao: &'a U, + pub tcn_dao: Arc, pub tcn_matcher: V, pub api: &'a W, pub memo_mapper: &'a X, @@ -478,6 +492,7 @@ mod tests { symptom_inputs::UserInput, }, }; + use rusqlite::Connection; use tcn::{MemoType, ReportAuthorizationKey}; #[test] @@ -559,19 +574,108 @@ mod tests { } #[test] - fn tcn_saved_and_restored_from_bytes() { - let mut tcn_bytes: [u8; 16] = [0; 16]; - tcn_bytes[1] = 1; - tcn_bytes[12] = 5; - tcn_bytes[15] = 250; - let time = UnixTime { value: 1590528300 }; + #[ignore] + fn saves_and_loads_observed_tcn() { + let database = Arc::new(Database::new( + Connection::open_in_memory().expect("Couldn't create database!"), + )); + let tcn_dao = TcnDaoImpl::new(database.clone()); + let observed_tcn = ObservedTcn { - tcn: TemporaryContactNumber(tcn_bytes), - time, + tcn: TemporaryContactNumber([ + 24, 229, 125, 245, 98, 86, 219, 221, 172, 25, 232, 150, 206, 66, 164, 173, + ]), + time: UnixTime { value: 1590528300 }, + }; + + let save_res = tcn_dao.save(&observed_tcn); + assert!(save_res.is_ok()); + + let loaded_tcns_res = tcn_dao.all(); + assert!(loaded_tcns_res.is_ok()); + + let loaded_tcns = loaded_tcns_res.unwrap(); + + assert_eq!(loaded_tcns.len(), 1); + assert_eq!(loaded_tcns[0], observed_tcn); + } + + #[test] + #[ignore] + fn saves_and_loads_multiple_tcns() { + let database = Arc::new(Database::new( + Connection::open_in_memory().expect("Couldn't create database!"), + )); + let tcn_dao = TcnDaoImpl::new(database.clone()); + + let observed_tcn_1 = ObservedTcn { + tcn: TemporaryContactNumber([ + 24, 229, 125, 245, 98, 86, 219, 221, 172, 25, 232, 150, 206, 66, 164, 173, + ]), + time: UnixTime { value: 1590528300 }, + }; + let observed_tcn_2 = ObservedTcn { + tcn: TemporaryContactNumber([ + 43, 229, 125, 245, 98, 86, 100, 1, 172, 25, 0, 150, 123, 66, 34, 12, + ]), + time: UnixTime { value: 1590518190 }, + }; + let observed_tcn_3 = ObservedTcn { + tcn: TemporaryContactNumber([ + 11, 246, 125, 123, 102, 86, 100, 1, 34, 25, 21, 150, 99, 66, 34, 0, + ]), + time: UnixTime { value: 2230522104 }, }; - let observed_tcn_as_bytes = observed_tcn.as_bytes(); - let observed_tc_from_bytes = ObservedTcn::from_bytes(observed_tcn_as_bytes); - assert_eq!(observed_tc_from_bytes, observed_tcn); + + let save_res_1 = tcn_dao.save(&observed_tcn_1); + let save_res_2 = tcn_dao.save(&observed_tcn_2); + let save_res_3 = tcn_dao.save(&observed_tcn_3); + assert!(save_res_1.is_ok()); + assert!(save_res_2.is_ok()); + assert!(save_res_3.is_ok()); + + let loaded_tcns_res = tcn_dao.all(); + assert!(loaded_tcns_res.is_ok()); + + let loaded_tcns = loaded_tcns_res.unwrap(); + + assert_eq!(loaded_tcns.len(), 3); + assert_eq!(loaded_tcns[0], observed_tcn_1); + assert_eq!(loaded_tcns[1], observed_tcn_2); + assert_eq!(loaded_tcns[2], observed_tcn_3); + } + + #[test] + #[ignore] + // Currently there's no unique, as for current use case it doesn't seem necessary/critical and + // it affects negatively performance. + // TODO revisit + fn saves_and_loads_repeated_tcns() { + let database = Arc::new(Database::new( + Connection::open_in_memory().expect("Couldn't create database!"), + )); + let tcn_dao = TcnDaoImpl::new(database.clone()); + + let observed_tcn_1 = ObservedTcn { + tcn: TemporaryContactNumber([ + 24, 229, 125, 245, 98, 86, 219, 221, 172, 25, 232, 150, 206, 66, 164, 173, + ]), + time: UnixTime { value: 1590528300 }, + }; + + let save_res_1 = tcn_dao.save(&observed_tcn_1); + let save_res_2 = tcn_dao.save(&observed_tcn_1); + assert!(save_res_1.is_ok()); + assert!(save_res_2.is_ok()); + + let loaded_tcns_res = tcn_dao.all(); + assert!(loaded_tcns_res.is_ok()); + + let loaded_tcns = loaded_tcns_res.unwrap(); + + assert_eq!(loaded_tcns.len(), 2); + assert_eq!(loaded_tcns[0], observed_tcn_1); + assert_eq!(loaded_tcns[1], observed_tcn_1); } // Utility to see quickly all TCNs (hex) for a report