Skip to content

Commit

Permalink
Price Feed and Sink example
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Apr 23, 2024
1 parent b60cf75 commit 23ad6ec
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
members = [
".",
"examples",
"examples/price-feed",
"benchmark",
"sea-streamer-file",
"sea-streamer-kafka",
Expand Down
5 changes: 4 additions & 1 deletion examples/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# SeaStreamer Examples

This crate serves as a demo of SeaStreamer and also can be a starting point for you to develop your stream processors.
This crate serves as a demo of SeaStreamer and also can be a starting point for you to develop your stream processors, while demonstrating various stream processing techniques.

This crate works for both `tokio` and `async-std`, and streams to `kafka` and `stdio`.

Expand All @@ -11,6 +11,9 @@ This crate works for both `tokio` and `async-std`, and streams to `kafka` and `s
+ `buffered`: An advanced stream processor with internal buffering and batch processing
+ `blocking`: An advanced stream processor for handling blocking / CPU-bound tasks

+ `price-feed`: A websocket to Redis / Kafka stream producer
+ `sea-orm-sink`: A Redis / Kafka to SQLite data sink

## Running the basic processor example

With Kafka:
Expand Down
19 changes: 19 additions & 0 deletions examples/price-feed/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "sea-streamer-price-feed"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
anyhow = "1"
async-tungstenite = { version = "0.24", features = ["tokio-runtime", "tokio-native-tls"] }
clap = { version = "4.5", features = ["derive"] }
rust_decimal = "1.34"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }

[dependencies.sea-streamer]
path = "../.." # remove this line in your own project
version = "0.3"
features = ["redis", "socket", "json"]
19 changes: 19 additions & 0 deletions examples/price-feed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Price Feed

This example demonstrates how to subscribe to a real-time websocket data feed and stream to Redis / Kafka.

As an example, we subscribe to the `GBP/USD` price feed from Kraken, documentation can be found at https://docs.kraken.com/websockets/#message-spread.

It will stream to localhost Redis by default. Stream key will be named `GBP_USD`.

```sh
cargo run
```

Here is a sample message serialized to JSON:

```json
{"spread":{"bid":"1.23150","ask":"1.23166","timestamp":"2024-04-22T11:24:41.461661","bid_vol":"40.55300552","ask_vol":"315.04699448"},"channel_name":"spread","pair":"GBP/USD"}
```

#### NOT FINANCIAL ADVICE: FOR EDUCATIONAL AND INFORMATIONAL PURPOSES ONLY
119 changes: 119 additions & 0 deletions examples/price-feed/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use anyhow::{bail, Result};
use async_tungstenite::tungstenite::Message;
use clap::Parser;
use rust_decimal::Decimal;
use sea_streamer::{
export::futures::{SinkExt, StreamExt},
Producer, SeaProducer, SeaStreamer, Streamer, StreamerUri, Timestamp, TIMESTAMP_FORMAT,
};
use serde::{Deserialize, Serialize};

#[derive(Debug, Parser)]
struct Args {
#[clap(long, help = "Streamer URI", default_value = "redis://localhost")]
streamer: StreamerUri,
}

#[derive(Debug, Serialize, Deserialize)]
struct SpreadMessage {
#[allow(dead_code)]
#[serde(skip_serializing)]
channel_id: u32,
spread: Spread,
channel_name: String,
pair: String,
}

#[derive(Debug, Serialize, Deserialize)]
struct Spread {
bid: Decimal,
ask: Decimal,
#[serde(with = "timestamp_serde")]
timestamp: Timestamp,
bid_vol: Decimal,
ask_vol: Decimal,
}

#[tokio::main]
async fn main() -> Result<()> {
let Args { streamer } = Args::parse();

println!("Connecting ..");
let (mut ws, _) = async_tungstenite::tokio::connect_async("wss://ws.kraken.com/").await?;
println!("Connected.");

ws.send(Message::Text(
r#"{
"event": "subscribe",
"pair": [
"GBP/USD"
],
"subscription": {
"name": "spread"
}
}"#
.to_owned(),
))
.await?;

loop {
match ws.next().await {
Some(Ok(Message::Text(data))) => {
println!("{data}");
if data.contains(r#""status":"subscribed""#) {
println!("Subscribed.");
break;
}
}
e => bail!("Unexpected message {e:?}"),
}
}

let streamer = SeaStreamer::connect(streamer, Default::default()).await?;
let producer: SeaProducer = streamer
.create_producer("GBP_USD".parse()?, Default::default())
.await?;

loop {
match ws.next().await {
Some(Ok(Message::Text(data))) => {
if data == r#"{"event":"heartbeat"}"# {
continue;
}
let spread: SpreadMessage = serde_json::from_str(&data)?;
let message = serde_json::to_string(&spread)?;
println!("{message}");
producer.send(message)?;
}
Some(Err(e)) => bail!("Socket error: {e}"),
None => bail!("Stream ended"),
e => bail!("Unexpected message {e:?}"),
}
}
}

mod timestamp_serde {
use super::*;

pub fn deserialize<'de, D>(deserializer: D) -> Result<Timestamp, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = <&str>::deserialize(deserializer)?;
let value: Decimal = s.parse().map_err(serde::de::Error::custom)?;
Timestamp::from_unix_timestamp_nanos(
(value * Decimal::from(1_000_000_000)).try_into().unwrap(),
)
.map_err(serde::de::Error::custom)
}

pub fn serialize<S>(v: &Timestamp, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(
&v.format(TIMESTAMP_FORMAT)
.map_err(serde::ser::Error::custom)?,
)
}
}
25 changes: 25 additions & 0 deletions examples/sea-orm-sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
[workspace]

[package]
name = "sea-streamer-sea-orm-sink"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
anyhow = { version = "1" }
clap = { version = "4.5", features = ["derive"] }
env_logger = { version = "0.9" }
log = { version = "0.4", default-features = false }
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
tokio = { version = "1", features = ["full"] }

[dependencies.sea-orm]
version = "1.0.0-rc.3"
features = ["sqlx-sqlite", "runtime-tokio-native-tls"]

[dependencies.sea-streamer]
path = "../.." # remove this line in your own project
version = "0.3"
features = ["redis", "socket", "json", "runtime-tokio"]
Binary file added examples/sea-orm-sink/GBP_USD.sqlite
Binary file not shown.
11 changes: 11 additions & 0 deletions examples/sea-orm-sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# SeaORM Data Sink

This example demonstrates how to consume a stream from Redis / Kafka and store the data to MySQL / Postgres / SQLite / SQL Server.

It will create the table automatically. You have to run the `price-feed` example first. It will subscribe to `GBP_USD` and saves to `GBP_USD.sqlite` by default. Incoming JSON messages will be deserialized and inserted into database.

```sh
cargo run
```

A more complex example with buffering and periodic flush can be found at https://github.com/SeaQL/FireDBG.for.Rust/blob/main/indexer/src/main.rs
69 changes: 69 additions & 0 deletions examples/sea-orm-sink/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
mod spread;

use anyhow::Result;
use clap::Parser;
use sea_orm::{
ActiveModelTrait, ConnectOptions, ConnectionTrait, Database, DbConn, DbErr, IntoActiveModel,
NotSet, Schema,
};
use sea_streamer::{Buffer, Consumer, Message, SeaStreamer, StreamKey, Streamer, StreamerUri};
use serde::Deserialize;

#[derive(Debug, Parser)]
struct Args {
#[clap(long, help = "Streamer URI", default_value = "redis://localhost")]
streamer: StreamerUri,
#[clap(long, help = "Stream Key", default_value = "GBP_USD")]
stream_key: StreamKey,
}

#[derive(Deserialize)]
struct Item {
spread: spread::Model,
}

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let Args {
streamer,
stream_key,
} = Args::parse();

let mut opt = ConnectOptions::new(format!("sqlite://{}.sqlite?mode=rwc", stream_key));
opt.max_connections(1).sqlx_logging(false);
let db = Database::connect(opt).await?;
create_tables(&db).await?;

let streamer = SeaStreamer::connect(streamer, Default::default()).await?;
let consumer = streamer
.create_consumer(&[stream_key], Default::default())
.await?;

loop {
let message = consumer.next().await?;
let payload = message.message();
let json = payload.as_str()?;
log::info!("{json}");
let item: Item = serde_json::from_str(json)?;
let mut spread = item.spread.into_active_model();
spread.id = NotSet;
spread.save(&db).await?;
}
}

async fn create_tables(db: &DbConn) -> Result<(), DbErr> {
let builder = db.get_database_backend();
let schema = Schema::new(builder);

let stmt = builder.build(
schema
.create_table_from_entity(spread::Entity)
.if_not_exists(),
);
log::info!("{stmt}");
db.execute(stmt).await?;

Ok(())
}
20 changes: 20 additions & 0 deletions examples/sea-orm-sink/src/spread.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use sea_orm::entity::prelude::*;
use serde::Deserialize;

#[derive(Debug, Clone, PartialEq, Eq, DeriveEntityModel, Deserialize)]
#[sea_orm(table_name = "event")]
pub struct Model {
#[sea_orm(primary_key)]
#[serde(default)]
pub id: i32,
pub timestamp: String,
pub bid: String,
pub ask: String,
pub bid_vol: String,
pub ask_vol: String,
}

#[derive(Debug, Copy, Clone, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

0 comments on commit 23ad6ec

Please sign in to comment.