Skip to content

Commit

Permalink
Botlink: Use tokio for websocket (#213)
Browse files Browse the repository at this point in the history
* Add tokio deps

* Use callback for header

* Implement async websocket

* Nitpick whitespace

* Swallow recv error

* Initial blocking recv

* dedupe code

* Just block

* Rework startup

* Use select on channel

* downgrade some arcs

* Trim

* Revert file
  • Loading branch information
Terkwood authored Apr 10, 2020
1 parent d5834c3 commit a47c71e
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 107 deletions.
116 changes: 113 additions & 3 deletions botlink/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion botlink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "botlink"
version = "0.0.3"
version = "0.1.0"
authors = ["terkwood <[email protected]>"]
edition = "2018"

Expand All @@ -21,3 +21,6 @@ lazy_static = "1.4.0"
dotenv = "0.15.0"
uuid = { version = "0.8.1", features = ["v4", "serde"] }
base64 = "0.12.0"
futures-util = { version="0.3.4", default-features = false, features = ["async-await", "sink", "std"] }
tokio-tungstenite = "0.10.1"
tokio = { version="0.2.17", default-features = false, features = ["io-std", "io-util", "macros", "stream", "time"] }
4 changes: 1 addition & 3 deletions botlink/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ const ENV_ADDRESS: &str = "ADDRESS";
const DEFAULT_ADDRESS: &str = "127.0.0.1:3012";
lazy_static! {
pub static ref AUTHORIZATION: Option<String> = env::var(ENV_AUTHORIZATION).ok();
pub static ref ADDRESS: String = env::var(ENV_ADDRESS)
.unwrap_or(DEFAULT_ADDRESS.to_string())
.to_string();
pub static ref ADDRESS: String = env::var(ENV_ADDRESS).unwrap_or(DEFAULT_ADDRESS.to_string());
}

pub fn init() {
Expand Down
2 changes: 2 additions & 0 deletions botlink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ extern crate env_logger;
extern crate log;
extern crate micro_model_bot;
extern crate micro_model_moves;
pub extern crate tokio;
extern crate tokio_tungstenite;
extern crate uuid;

pub mod env;
Expand Down
13 changes: 8 additions & 5 deletions botlink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
extern crate botlink;

use botlink::{stream, websocket};
use log::info;
use std::thread;
const VERSION: &str = env!("CARGO_PKG_VERSION");

use botlink::registry::Components;
fn main() {

#[tokio::main]
async fn main() {
env_logger::init();
botlink::env::init();
info!("🔢 {}", VERSION);

let components = Components::default();
let ws_opts = websocket::WSOpts::from(&components);
thread::spawn(move || websocket::listen(ws_opts));
let mco = components.move_computed_out.clone();
let xmm = components.xadder.clone();
thread::spawn(move || { stream::write_moves(mco, xmm) } );
stream::process(&mut stream::StreamOpts::from(components));

thread::spawn(move || stream::write_moves(mco, xmm));
thread::spawn(move || stream::process(&mut stream::StreamOpts::from(components)));
websocket::listen(ws_opts).await;
}
6 changes: 3 additions & 3 deletions botlink/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use redis_conn_pool::RedisHostUrl;
use std::sync::Arc;

pub struct Components {
pub game_repo: Box<dyn AttachedBotsRepo>,
pub ab_repo: Box<dyn AttachedBotsRepo>,
pub entry_id_repo: Box<dyn EntryIdRepo>,
pub xreader: Box<dyn XReader>,
pub xadder: Arc<dyn XAdder>,
Expand All @@ -27,15 +27,15 @@ impl Default for Components {

let pool = Arc::new(redis_conn_pool::create(RedisHostUrl::default()));
Components {
game_repo: Box::new(RedisAttachedBotsRepo {
ab_repo: Box::new(RedisAttachedBotsRepo {
pool: pool.clone(),
key_provider: crate::repo::redis_keys::KeyProvider::default(),
}),
entry_id_repo: Box::new(RedisEntryIdRepo {
pool: pool.clone(),
key_provider: crate::repo::redis_keys::KeyProvider::default(),
}),
xreader: Box::new(RedisXReader { pool: pool.clone() }),
xreader: Box::new(RedisXReader { pool: pool.clone() }),
xadder: Arc::new(RedisXAdder { pool }),
compute_move_in,
compute_move_out,
Expand Down
2 changes: 1 addition & 1 deletion botlink/src/repo/attached_bots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use redis_conn_pool::{r2d2, r2d2_redis, redis, Pool};

use std::sync::Arc;

pub trait AttachedBotsRepo {
pub trait AttachedBotsRepo: Send + Sync {
fn is_attached(&self, game_id: &GameId, player: Player) -> Result<bool, RepoErr>;

fn attach(&mut self, game_id: &GameId, player: Player) -> Result<(), RepoErr>;
Expand Down
5 changes: 2 additions & 3 deletions botlink/src/repo/entry_ids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use redis_streams::XReadEntryId;
use std::collections::HashMap;
use std::sync::Arc;

pub trait EntryIdRepo {
pub trait EntryIdRepo: Send + Sync {
fn fetch_all(&self) -> Result<AllEntryIds, super::RepoErr>;

fn update(
Expand All @@ -15,8 +15,7 @@ pub trait EntryIdRepo {
}

pub struct RedisEntryIdRepo {
pub pool: Arc<Pool>
,
pub pool: Arc<Pool>,
pub key_provider: super::redis_keys::KeyProvider,
}
const EMPTY_EID: &str = "0-0";
Expand Down
43 changes: 18 additions & 25 deletions botlink/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,25 @@ pub fn process(opts: &mut StreamOpts) {
) => {
if let Err(e) = opts.attached_bots_repo.attach(&game_id, player) {
error!("Error attaching bot {:?}", e)
} else if let Err(e) = opts
.entry_id_repo
.update(EntryIdType::AttachBotEvent, entry_id)
{
error!("Error saving entry ID for attach bot {:?}", e)
} else {
if let Err(e) = opts
.entry_id_repo
.update(EntryIdType::AttachBotEvent, entry_id)
{
error!("Error saving entry ID for attach bot {:?}", e)
} else {
let mut game_state = GameState::default();
if let Some(bs) = board_size {
game_state.board.size = bs.into()
}
let mut game_state = GameState::default();
if let Some(bs) = board_size {
game_state.board.size = bs.into()
}

if let Err(e) =
opts.xadder.xadd_game_state(&game_id, &game_state)
{
error!("Error writing redis stream for game state changelog : {:?}",e)
} else {
if let Err(e) = opts.xadder.xadd_bot_attached(
micro_model_bot::gateway::BotAttached {
game_id,
player,
},
) {
error!("Error xadd bot attached {:?}", e)
}
}
if let Err(e) =
opts.xadder.xadd_game_state(&game_id, &game_state)
{
error!("Error writing redis stream for game state changelog : {:?}",e)
} else if let Err(e) = opts.xadder.xadd_bot_attached(
micro_model_bot::gateway::BotAttached { game_id, player },
) {
error!("Error xadd bot attached {:?}", e)
}
}
}
Expand Down Expand Up @@ -110,7 +103,7 @@ pub struct StreamOpts {
impl StreamOpts {
pub fn from(components: Components) -> Self {
StreamOpts {
attached_bots_repo: components.game_repo,
attached_bots_repo: components.ab_repo,
entry_id_repo: components.entry_id_repo,
xreader: components.xreader,
xadder: components.xadder,
Expand Down
13 changes: 9 additions & 4 deletions botlink/src/stream/write_moves.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ use crossbeam_channel::{select, Receiver};
use log::error;
use micro_model_bot::MoveComputed;
use std::sync::Arc;

pub fn write_moves(move_computed_out: Receiver<MoveComputed>, xadder: Arc<dyn XAdder>) {
loop {
select! {
recv(move_computed_out) -> msg => match msg {
Ok(MoveComputed(command)) => if let Err(e)=xadder.xadd_make_move_command(command) {error!("could not xadd move command : {:?}",e)},
Err(e) => error!("Unable to receive move computed out {:?}",e)
select! { recv(move_computed_out) -> msg =>
match msg {
Ok(MoveComputed(command)) =>
if let Err(e)=xadder.xadd_make_move_command(command) {
error!("could not xadd move command : {:?}",e)
}
Err(e) =>
error!("loop recv: {}", e)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion botlink/src/stream/xread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub type XReadResult = Vec<HashMap<String, Vec<HashMap<String, redis::Value>>>>;
/// xread_sorted performs a redis xread then sorts the results
///
/// entry_ids: the minimum entry ids from which to read
pub trait XReader {
pub trait XReader: Send + Sync {
fn xread_sorted(
&self,
entry_ids: AllEntryIds,
Expand Down
Loading

0 comments on commit a47c71e

Please sign in to comment.