Skip to content

Commit

Permalink
botlink: Use Arc for redis conn pool... (#212)
Browse files Browse the repository at this point in the history
* Track bot attached, update stream names

* Combine XAdders

* Use all the arcs
  • Loading branch information
Terkwood authored Apr 9, 2020
1 parent 574ac7d commit d5834c3
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 41 deletions.
2 changes: 1 addition & 1 deletion botlink/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion botlink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "botlink"
version = "0.0.2"
version = "0.0.3"
authors = ["terkwood <[email protected]>"]
edition = "2018"

Expand Down
5 changes: 5 additions & 0 deletions botlink/dev-cp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/bin/bash

CT_ROOT=bugout_botlink_1:/var/BUGOUT/botlink

docker cp src/. $CT_ROOT/src/. && docker cp Cargo.toml $CT_ROOT/Cargo.toml
4 changes: 2 additions & 2 deletions botlink/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ fn main() {
let ws_opts = websocket::WSOpts::from(&components);
thread::spawn(move || websocket::listen(ws_opts));
let mco = components.move_computed_out.clone();
let xmm = components.xadder_mm.clone();
thread::spawn(move || stream::write_moves(mco, xmm));
let xmm = components.xadder.clone();
thread::spawn(move || { stream::write_moves(mco, xmm) } );
stream::process(&mut stream::StreamOpts::from(components));
}
10 changes: 4 additions & 6 deletions botlink/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ pub struct Components {
pub game_repo: Box<dyn AttachedBotsRepo>,
pub entry_id_repo: Box<dyn EntryIdRepo>,
pub xreader: Box<dyn XReader>,
pub xadder_gs: Box<dyn XAdderGS>,
pub xadder_mm: Arc<dyn XAdderMM>,
pub xadder: Arc<dyn XAdder>,
pub compute_move_in: Sender<ComputeMove>,
pub compute_move_out: Receiver<ComputeMove>,
pub move_computed_in: Sender<MoveComputed>,
Expand All @@ -26,7 +25,7 @@ impl Default for Components {
let (move_computed_in, move_computed_out): (Sender<MoveComputed>, Receiver<MoveComputed>) =
unbounded();

let pool = redis_conn_pool::create(RedisHostUrl::default());
let pool = Arc::new(redis_conn_pool::create(RedisHostUrl::default()));
Components {
game_repo: Box::new(RedisAttachedBotsRepo {
pool: pool.clone(),
Expand All @@ -36,9 +35,8 @@ impl Default for Components {
pool: pool.clone(),
key_provider: crate::repo::redis_keys::KeyProvider::default(),
}),
xreader: Box::new(RedisXReader { pool: pool.clone() }),
xadder_gs: Box::new(RedisXAdderGS { pool: pool.clone() }),
xadder_mm: Arc::new(RedisXAdderMM { pool }),
xreader: Box::new(RedisXReader { pool: pool.clone() }),
xadder: Arc::new(RedisXAdder { pool }),
compute_move_in,
compute_move_out,
move_computed_in,
Expand Down
4 changes: 3 additions & 1 deletion botlink/src/repo/attached_bots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use micro_model_moves::{GameId, Player};
use redis::Commands;
use redis_conn_pool::{r2d2, r2d2_redis, redis, Pool};

use std::sync::Arc;

pub trait AttachedBotsRepo {
fn is_attached(&self, game_id: &GameId, player: Player) -> Result<bool, RepoErr>;

Expand All @@ -13,7 +15,7 @@ pub trait AttachedBotsRepo {
const TTL_SECS: usize = 86400;

pub struct RedisAttachedBotsRepo {
pub pool: Pool,
pub pool: Arc<Pool>,
pub key_provider: KeyProvider,
}

Expand Down
5 changes: 4 additions & 1 deletion botlink/src/repo/entry_ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use redis_conn_pool::redis::Commands;
use redis_conn_pool::{redis, Pool};
use redis_streams::XReadEntryId;
use std::collections::HashMap;
use std::sync::Arc;

pub trait EntryIdRepo {
fn fetch_all(&self) -> Result<AllEntryIds, super::RepoErr>;

Expand All @@ -13,7 +15,8 @@ pub trait EntryIdRepo {
}

pub struct RedisEntryIdRepo {
pub pool: Pool,
pub pool: Arc<Pool>
,
pub key_provider: super::redis_keys::KeyProvider,
}
const EMPTY_EID: &str = "0-0";
Expand Down
39 changes: 31 additions & 8 deletions botlink/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use log::{error, info};
use micro_model_bot::gateway::AttachBot;
use micro_model_bot::ComputeMove;
use micro_model_moves::GameState;
use std::sync::Arc;
pub use write_moves::write_moves;
use xread::StreamData;

Expand Down Expand Up @@ -43,9 +44,18 @@ pub fn process(opts: &mut StreamOpts) {
}

if let Err(e) =
opts.xadder.xadd_game_state(game_id, game_state)
opts.xadder.xadd_game_state(&game_id, &game_state)
{
error!("Error writing redis stream for game state changelog : {:?}",e)
} else {
if let Err(e) = opts.xadder.xadd_bot_attached(
micro_model_bot::gateway::BotAttached {
game_id,
player,
},
) {
error!("Error xadd bot attached {:?}", e)
}
}
}
}
Expand Down Expand Up @@ -93,7 +103,7 @@ pub struct StreamOpts {
pub attached_bots_repo: Box<dyn AttachedBotsRepo>,
pub entry_id_repo: Box<dyn EntryIdRepo>,
pub xreader: Box<dyn xread::XReader>,
pub xadder: Box<dyn xadd::XAdderGS>,
pub xadder: Arc<dyn xadd::XAdder>,
pub compute_move_in: Sender<ComputeMove>,
}

Expand All @@ -103,7 +113,7 @@ impl StreamOpts {
attached_bots_repo: components.game_repo,
entry_id_repo: components.entry_id_repo,
xreader: components.xreader,
xadder: components.xadder_gs,
xadder: components.xadder,
compute_move_in: components.compute_move_in,
}
}
Expand All @@ -113,6 +123,7 @@ impl StreamOpts {
mod tests {
use super::*;
use crate::repo::*;
use crate::stream::xadd::*;
use crossbeam_channel::{after, never, select, unbounded, Receiver};
use micro_model_moves::*;
use redis_streams::XReadEntryId;
Expand Down Expand Up @@ -188,13 +199,25 @@ mod tests {
struct FakeXAdder {
added_in: Sender<(GameId, GameState)>,
}
impl xadd::XAdderGS for FakeXAdder {
impl xadd::XAdder for FakeXAdder {
fn xadd_game_state(
&self,
game_id: GameId,
game_state: GameState,
game_id: &GameId,
game_state: &GameState,
) -> Result<(), XAddError> {
Ok(self
.added_in
.send((game_id.clone(), game_state.clone()))
.expect("send add"))
}
fn xadd_bot_attached(
&self,
_bot_attached: micro_model_bot::gateway::BotAttached,
) -> Result<(), crate::stream::xadd::XAddError> {
Ok(self.added_in.send((game_id, game_state)).expect("send add"))
Ok(())
}
fn xadd_make_move_command(&self, _command: MakeMoveCommand) -> Result<(), XAddError> {
Ok(info!("Doing nothing for xadd make move"))
}
}

Expand Down Expand Up @@ -269,7 +292,7 @@ mod tests {
board_size,
incoming_game_state: incoming_game_state.clone(),
});
let xadder = Box::new(FakeXAdder { added_in });
let xadder = Arc::new(FakeXAdder { added_in });

thread::spawn(move || {
let mut opts = StreamOpts {
Expand Down
3 changes: 2 additions & 1 deletion botlink/src/stream/topics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub const ATTACH_BOT_EV: &str = "bugout-attach-bot-ev";
pub const ATTACH_BOT_CMD: &str = "bugout-attach-bot-cmd";
pub const GAME_STATES_CHANGELOG: &str = "bugout-game-states";
pub const MAKE_MOVE_CMD: &str = "bugout-make-move-cmd";
pub const BOT_ATTACHED_EV: &str = "bugout-bot-attached-ev";
4 changes: 2 additions & 2 deletions botlink/src/stream/write_moves.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::xadd::XAdderMM;
use super::xadd::XAdder;
use crossbeam_channel::{select, Receiver};
use log::error;
use micro_model_bot::MoveComputed;
use std::sync::Arc;
pub fn write_moves(move_computed_out: Receiver<MoveComputed>, xadder: Arc<dyn XAdderMM>) {
pub fn write_moves(move_computed_out: Receiver<MoveComputed>, xadder: Arc<dyn XAdder>) {
loop {
select! {
recv(move_computed_out) -> msg => match msg {
Expand Down
38 changes: 24 additions & 14 deletions botlink/src/stream/xadd.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::stream::topics;
use micro_model_moves::{Coord, GameId, GameState, MakeMoveCommand};
use micro_model_bot::gateway::BotAttached;
use redis_conn_pool::redis::RedisError;
use redis_conn_pool::{redis, Pool};
pub trait XAdderGS {
fn xadd_game_state(&self, game_id: GameId, game_state: GameState) -> Result<(), XAddError>;
}

pub trait XAdderMM: Send + Sync {
use std::sync::Arc;

pub trait XAdder: Send + Sync {
fn xadd_game_state(&self, game_id: &GameId, game_state: &GameState) -> Result<(), XAddError>;
fn xadd_make_move_command(&self, command: MakeMoveCommand) -> Result<(), XAddError>;
fn xadd_bot_attached(&self, bot_attached: BotAttached) -> Result<(), XAddError>;
}

#[derive(Debug)]
Expand All @@ -16,11 +18,11 @@ pub enum XAddError {
Ser(Box<bincode::ErrorKind>),
}

pub struct RedisXAdderGS {
pub pool: Pool,
pub struct RedisXAdder {
pub pool: Arc<Pool>,
}
impl XAdderGS for RedisXAdderGS {
fn xadd_game_state(&self, game_id: GameId, game_state: GameState) -> Result<(), XAddError> {
impl XAdder for RedisXAdder {
fn xadd_game_state(&self, game_id: &GameId, game_state: &GameState) -> Result<(), XAddError> {
let mut conn = self.pool.get().expect("redis pool");
redis::cmd("XADD")
.arg(topics::GAME_STATES_CHANGELOG)
Expand All @@ -35,12 +37,6 @@ impl XAdderGS for RedisXAdderGS {
.query::<String>(&mut *conn)?;
Ok(())
}
}

pub struct RedisXAdderMM {
pub pool: Pool,
}
impl XAdderMM for RedisXAdderMM {
fn xadd_make_move_command(&self, command: MakeMoveCommand) -> Result<(), XAddError> {
let mut conn = self.pool.get().unwrap();

Expand All @@ -63,8 +59,22 @@ impl XAdderMM for RedisXAdderMM {
redis_cmd.query::<String>(&mut *conn)?;
Ok(())
}
fn xadd_bot_attached(&self, bot_attached: BotAttached) -> Result<(), XAddError> {
let mut conn = self.pool.get().expect("redis pool");
redis::cmd("XADD")
.arg(topics::BOT_ATTACHED_EV)
.arg("MAXLEN")
.arg("~")
.arg("1000")
.arg("*")
.arg("data")
.arg(bot_attached.serialize()?)
.query::<String>(&mut *conn)?;
Ok(())
}
}


impl From<RedisError> for XAddError {
fn from(r: RedisError) -> Self {
XAddError::Redis(r)
Expand Down
14 changes: 10 additions & 4 deletions botlink/src/stream/xread.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use super::topics;
use crate::repo::AllEntryIds;
use log::warn;
use log::{trace, warn};
use micro_model_bot::gateway::AttachBot;
use micro_model_moves::{GameId, GameState};
use redis_conn_pool::redis;
use redis_conn_pool::Pool;
use redis_streams::XReadEntryId;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use uuid::Uuid;

const BLOCK_MSEC: u32 = 5000;
Expand All @@ -25,19 +26,24 @@ pub trait XReader {
}

pub struct RedisXReader {
pub pool: Pool,
pub pool: Arc<Pool>,
}
impl XReader for RedisXReader {
fn xread_sorted(
&self,
entry_ids: AllEntryIds,
) -> Result<std::vec::Vec<(XReadEntryId, StreamData)>, redis::RedisError> {
trace!(
"xreading from {} and {}",
topics::ATTACH_BOT_CMD,
topics::GAME_STATES_CHANGELOG
);
let mut conn = self.pool.get().unwrap();
let xrr = redis::cmd("XREAD")
.arg("BLOCK")
.arg(&BLOCK_MSEC.to_string())
.arg("STREAMS")
.arg(topics::ATTACH_BOT_EV)
.arg(topics::ATTACH_BOT_CMD)
.arg(topics::GAME_STATES_CHANGELOG)
.arg(entry_ids.attach_bot_eid.to_string())
.arg(entry_ids.game_states_eid.to_string())
Expand Down Expand Up @@ -90,7 +96,7 @@ fn deser(xread_result: XReadResult) -> HashMap<XReadEntryId, StreamData> {
}
}
}
} else if &xread_topic[..] == topics::ATTACH_BOT_EV {
} else if &xread_topic[..] == topics::ATTACH_BOT_CMD {
for with_timestamps in xread_data {
for (k, v) in with_timestamps {
let shape: Result<(String, Vec<u8>), _> = // data <bin>
Expand Down
2 changes: 2 additions & 0 deletions botlink/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub fn listen(opts: WSOpts) {
}
}
if is_authorized {
info!("Connection open");
Ok(response)
} else {
warn!("No Auth");
Expand All @@ -40,6 +41,7 @@ pub fn listen(opts: WSOpts) {
.expect("cannot form response"))
}
} else {
info!("Connection open");
Ok(response)
}
};
Expand Down

0 comments on commit d5834c3

Please sign in to comment.