Skip to content

Commit

Permalink
Gateway: implement backend-doubler (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
Terkwood authored Apr 4, 2020
1 parent 8d972c0 commit 1b045a6
Show file tree
Hide file tree
Showing 26 changed files with 768 additions and 261 deletions.
167 changes: 149 additions & 18 deletions gateway/Cargo.lock

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions gateway/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "gateway"
version = "0.12.2"
authors = ["Terkwood <metaterkhorn@gmail.com>"]
version = "0.12.3"
authors = ["terkwood <[email protected].com>"]
edition = "2018"

[dependencies]
Expand All @@ -10,8 +10,8 @@ crossbeam-channel = "0.4.0"
mio-extras = "2.0.6"
rand = "0.7.2"
rdkafka = "0.23.0"
serde = "1.0.103"
serde_derive = "1.0.103"
serde = "1.0.106"
serde_derive = "1.0.106"
serde_json = "1.0.44"
time = "0.1.42"
uuid = { version = "0.8.1", features = ["v4", "serde"] }
Expand All @@ -23,6 +23,10 @@ dotenv = "0.15.0"
futures = "0.3.0"
chrono = { version = "0.4.10", features = ["serde"] }
r2d2_redis = "0.12.0"
log = "0.4.8"
env_logger = "0.7.1"
micro_model_moves = { git = "https://github.com/Terkwood/BUGOUT", branch = "unstable" }
micro_model_bot = { git = "https://github.com/Terkwood/BUGOUT", branch = "unstable" }

[dev-dependencies]
rand = "0.7.2"
2 changes: 2 additions & 0 deletions gateway/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ COPY . /var/BUGOUT/gateway/.

RUN cargo install --path .

ENV RUST_LOG info

CMD ["gateway"]
121 changes: 121 additions & 0 deletions gateway/src/backend/doubler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use crate::backend_commands::BackendCommands;

use crossbeam_channel::{select, Receiver, Sender};
use log::error;
pub fn double_commands(opts: DoublerOpts) {
loop {
select! {
recv(opts.session_commands_out) -> msg => match msg {
Ok( backend_command ) => {
if let Err(e) = opts.redis_commands_in.send(backend_command.clone()) {
error!("err doubler 0 {:?}",e)
}

if let Err(e) = opts.kafka_commands_in.send(backend_command) {
error!("FAILED doubler TO BACKEND {:?}", e)
}
}
Err(e) => error!("session command out: {:?}",e)
}
}
}
}

pub struct DoublerOpts {
pub session_commands_out: Receiver<BackendCommands>,
pub kafka_commands_in: Sender<BackendCommands>,
pub redis_commands_in: Sender<BackendCommands>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend_commands::*;
use crate::model::*;

use crossbeam_channel::{select, unbounded};
use std::thread;
use uuid::Uuid;

#[test]
fn test_double_commands() {
let (session_commands_in, session_commands_out): (
Sender<BackendCommands>,
Receiver<BackendCommands>,
) = unbounded();

let (kafka_commands_in, kafka_commands_out): (
Sender<BackendCommands>,
Receiver<BackendCommands>,
) = unbounded();

let (redis_commands_in, redis_commands_out): (
Sender<BackendCommands>,
Receiver<BackendCommands>,
) = unbounded();
thread::spawn(move || {
let opts = DoublerOpts {
kafka_commands_in,
redis_commands_in,
session_commands_out,
};
double_commands(opts)
});

{
let session_id = Uuid::new_v4();
let client_id = Uuid::new_v4();
session_commands_in
.send(BackendCommands::FindPublicGame(
FindPublicGameBackendCommand {
session_id,
client_id,
},
))
.expect("send0")
}

{
let game_id = micro_model_moves::GameId(Uuid::new_v4());
let player = Player::WHITE;
session_commands_in
.send(BackendCommands::AttachBot(
micro_model_bot::gateway::AttachBot {
game_id,
player: match player {
Player::WHITE => micro_model_moves::Player::WHITE,
_ => micro_model_moves::Player::BLACK,
},
},
))
.expect("send1")
}

select! { recv(kafka_commands_out) -> co =>
match co.expect("kafka co 0 select") {
BackendCommands::FindPublicGame(_) => assert!(true),
_ => assert!(false)
}
}

select! { recv(redis_commands_out) -> co =>
match co.expect("redis co 0 select") {
BackendCommands::FindPublicGame(_) => assert!(true),
_ => assert!(false)
}
}

select! { recv(kafka_commands_out) -> co =>
match co.expect("kafka co 1 select") {
BackendCommands::AttachBot(_) => assert!(true),
_ => assert!(false)
}
}

select! { recv(redis_commands_out) -> co =>
match co.expect("redis co 1 select") {
BackendCommands::AttachBot(_) => assert!(true),
_ => assert!(false)
}
}
}
}
7 changes: 7 additions & 0 deletions gateway/src/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod doubler;
mod start;

use crate::backend_events::{BackendEvents, KafkaShutdownEvent};
use crate::idle_status::KafkaActivityObserved;
pub use doubler::double_commands;
pub use start::{start_all, BackendInitOptions};
49 changes: 49 additions & 0 deletions gateway/src/backend/start.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::backend_commands::BackendCommands;
use crate::kafka_io;
use crate::redis_io;

use crossbeam_channel::{unbounded, Receiver, Sender};
use futures::executor::block_on;
use std::thread;

use super::*;

pub fn start_all(opts: BackendInitOptions) {
let (kafka_commands_in, kafka_commands_out): (
Sender<BackendCommands>,
Receiver<BackendCommands>,
) = unbounded();

let (redis_commands_in, redis_commands_out): (
Sender<BackendCommands>,
Receiver<BackendCommands>,
) = unbounded();

thread::spawn(move || redis_io::xadd_commands(redis_commands_out, &redis_io::create_pool()));

let bei = opts.backend_events_in.clone();
thread::spawn(move || redis_io::stream::process(bei));

let soc = opts.session_commands_out;
thread::spawn(move || {
double_commands(super::doubler::DoublerOpts {
session_commands_out: soc,
kafka_commands_in,
redis_commands_in,
})
});

block_on(kafka_io::start(
opts.backend_events_in.clone(),
opts.shutdown_in.clone(),
opts.kafka_activity_in.clone(),
kafka_commands_out,
))
}

pub struct BackendInitOptions {
pub backend_events_in: Sender<BackendEvents>,
pub shutdown_in: Sender<KafkaShutdownEvent>,
pub kafka_activity_in: Sender<KafkaActivityObserved>,
pub session_commands_out: Receiver<BackendCommands>,
}
21 changes: 11 additions & 10 deletions gateway/src/kafka_commands.rs → gateway/src/backend_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use serde_derive::{Deserialize, Serialize};
use crate::model::*;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct JoinPrivateGameKafkaCommand {
pub struct JoinPrivateGameBackendCommand {
#[serde(rename = "gameId")]
pub game_id: GameId,
#[serde(rename = "clientId")]
Expand All @@ -13,15 +13,15 @@ pub struct JoinPrivateGameKafkaCommand {
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FindPublicGameKafkaCommand {
pub struct FindPublicGameBackendCommand {
#[serde(rename = "clientId")]
pub client_id: ClientId,
#[serde(rename = "sessionId")]
pub session_id: SessionId,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChooseColorPrefKafkaCommand {
pub struct ChooseColorPrefBackendCommand {
#[serde(rename = "clientId")]
pub client_id: ClientId,
#[serde(rename = "colorPref")]
Expand All @@ -35,7 +35,7 @@ pub struct ChooseColorPrefKafkaCommand {
/// We omit specifying the game ID here, and
/// let game lobby choose it for us.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CreateGameKafkaCommand {
pub struct CreateGameBackendCommand {
#[serde(rename = "clientId")]
pub client_id: ClientId,
pub visibility: Visibility,
Expand Down Expand Up @@ -73,15 +73,16 @@ pub struct QuitGameCommand {
pub game_id: GameId,
}

#[derive(Serialize, Deserialize, Debug)]
pub enum KafkaCommands {
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum BackendCommands {
MakeMove(MakeMoveCommand),
ProvideHistory(ProvideHistoryCommand),
JoinPrivateGame(JoinPrivateGameKafkaCommand),
FindPublicGame(FindPublicGameKafkaCommand),
CreateGame(CreateGameKafkaCommand),
ChooseColorPref(ChooseColorPrefKafkaCommand),
JoinPrivateGame(JoinPrivateGameBackendCommand),
FindPublicGame(FindPublicGameBackendCommand),
CreateGame(CreateGameBackendCommand),
ChooseColorPref(ChooseColorPrefBackendCommand),
ClientHeartbeat(ClientHeartbeat),
SessionDisconnected(SessionDisconnected),
QuitGame(QuitGameCommand),
AttachBot(micro_model_bot::gateway::AttachBot),
}
50 changes: 27 additions & 23 deletions gateway/src/kafka_events.rs → gateway/src/backend_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,32 @@ use crate::compact_ids::CompactId;
use crate::model::*;

#[derive(Debug)]
pub enum KafkaEvents {
pub enum BackendEvents {
MoveMade(MoveMadeEvent),
MoveRejected(MoveRejectedEvent),
HistoryProvided(HistoryProvidedEvent),
GameReady(GameReadyKafkaEvent),
PrivateGameRejected(PrivateGameRejectedKafkaEvent),
WaitForOpponent(WaitForOpponentKafkaEvent),
GameReady(GameReadyBackendEvent),
PrivateGameRejected(PrivateGameRejectedBackendEvent),
WaitForOpponent(WaitForOpponentBackendEvent),
ColorsChosen(ColorsChosenEvent),
BotAttached(micro_model_bot::gateway::BotAttached),
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ShutdownEvent(pub SystemTime);
pub struct KafkaShutdownEvent(pub SystemTime);

impl KafkaEvents {
impl BackendEvents {
pub fn to_client_event(self) -> ClientEvents {
match self {
KafkaEvents::MoveMade(m) => ClientEvents::MoveMade(m),
KafkaEvents::MoveRejected(m) => ClientEvents::MoveRejected(m),
KafkaEvents::HistoryProvided(h) => ClientEvents::HistoryProvided(h),
BackendEvents::MoveMade(m) => ClientEvents::MoveMade(m),
BackendEvents::MoveRejected(m) => ClientEvents::MoveRejected(m),
BackendEvents::HistoryProvided(h) => ClientEvents::HistoryProvided(h),
// Dummy impl, don't trust it
KafkaEvents::ColorsChosen(c) => ClientEvents::YourColor(YourColorEvent {
BackendEvents::ColorsChosen(c) => ClientEvents::YourColor(YourColorEvent {
game_id: c.game_id,
your_color: Player::BLACK,
}),
KafkaEvents::GameReady(GameReadyKafkaEvent {
BackendEvents::GameReady(GameReadyBackendEvent {
game_id,
event_id,
board_size,
Expand All @@ -41,14 +42,14 @@ impl KafkaEvents {
event_id,
board_size,
}),
KafkaEvents::PrivateGameRejected(p) => {
BackendEvents::PrivateGameRejected(p) => {
ClientEvents::PrivateGameRejected(PrivateGameRejectedClientEvent {
game_id: CompactId::encode(p.game_id),
event_id: p.event_id,
})
}

KafkaEvents::WaitForOpponent(WaitForOpponentKafkaEvent {
BackendEvents::WaitForOpponent(WaitForOpponentBackendEvent {
game_id,
session_id: _,
event_id,
Expand All @@ -65,24 +66,27 @@ impl KafkaEvents {
link,
})
}

BackendEvents::BotAttached(ba) => ClientEvents::BotAttached(ba),
}
}

pub fn game_id(&self) -> GameId {
match self {
KafkaEvents::MoveMade(e) => e.game_id,
KafkaEvents::MoveRejected(e) => e.game_id,
KafkaEvents::HistoryProvided(e) => e.game_id,
KafkaEvents::GameReady(e) => e.game_id,
KafkaEvents::PrivateGameRejected(e) => e.game_id,
KafkaEvents::WaitForOpponent(e) => e.game_id,
KafkaEvents::ColorsChosen(e) => e.game_id,
BackendEvents::MoveMade(e) => e.game_id,
BackendEvents::MoveRejected(e) => e.game_id,
BackendEvents::HistoryProvided(e) => e.game_id,
BackendEvents::GameReady(e) => e.game_id,
BackendEvents::PrivateGameRejected(e) => e.game_id,
BackendEvents::WaitForOpponent(e) => e.game_id,
BackendEvents::ColorsChosen(e) => e.game_id,
BackendEvents::BotAttached(e) => e.game_id.0,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GameReadyKafkaEvent {
pub struct GameReadyBackendEvent {
#[serde(rename = "gameId")]
pub game_id: GameId,
pub sessions: GameSessions,
Expand All @@ -93,7 +97,7 @@ pub struct GameReadyKafkaEvent {
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WaitForOpponentKafkaEvent {
pub struct WaitForOpponentBackendEvent {
#[serde(rename = "gameId")]
pub game_id: GameId,
#[serde(rename = "sessionId")]
Expand All @@ -104,7 +108,7 @@ pub struct WaitForOpponentKafkaEvent {
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PrivateGameRejectedKafkaEvent {
pub struct PrivateGameRejectedBackendEvent {
#[serde(rename = "gameId")]
pub game_id: GameId,
#[serde(rename = "clientId")]
Expand Down
Loading

0 comments on commit 1b045a6

Please sign in to comment.