Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: auto clustering for workers #109

Draft
wants to merge 18 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,134 changes: 683 additions & 451 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This project is licensed under MIT License - see the [LICENSE](LICENSE) file for

## Architecture

**Note: this is subject to change during development process.** For up-to-date graph, see [arch.drawio](arch.drawio).
**Note: this is subject to change during development process.**
For up-to-date graph, see [arch.drawio](doc/src/arch.drawio).

![2022-03-25T14:04:34Z.png](https://imagedelivery.net/b21oeeg7p6hqWEI-IA5xDw/885a0e1b-5bcc-41d8-cc1b-45a7c4f6d700/public)
![arch](doc/src/arch.svg)
2 changes: 1 addition & 1 deletion coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
sg-core = { package = "core", path = "../core" }
tap = "1.0"
tarpc = { version = "0.29", features = ["serde1", "tokio1"] }
tarpc = { version = "0.30", features = ["serde1", "tokio1"] }
tokio = { version = "1.20", features = ["rt", "rt-multi-thread", "parking_lot", "time", "net", "macros"] }
tokio-tungstenite = "0.17"
tracing = "0.1"
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mongodb = { version = "2.3.0", features = ["bson-uuid-0_8"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tap = "1.0"
tarpc = { version = "0.29", features = ["serde1", "tokio1"] }
tarpc = { version = "0.30", features = ["serde1", "tokio1"] }
thiserror = "1.0"
tokio = { version = "1.20", features = ["rt"] }
tokio-executor-trait = { version = "2.1", optional = true }
Expand Down
20 changes: 20 additions & 0 deletions core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,26 @@ mod tests {
});
}

#[derive(Deserialize, Config)]
#[config(core = "crate")]
struct ConfigWithInheritAndExplicitDefaultsTwo {
#[config(inherit, default = r#"{ "c": 42 }"#)]
a: Nested,
}

#[test]
fn must_config_with_inherit_and_explicit_defaults_2() {
Jail::expect_with(|_| {
let config = ConfigWithInheritAndExplicitDefaultsTwo::from_env("TEST_").unwrap();

let ConfigWithInheritAndExplicitDefaultsTwo { a: Nested { b, c } } = config;
assert!(!b);
assert_eq!(c, 42);

Ok(())
});
}

#[derive(Deserialize, Config)]
#[config(core = "crate")]
struct ConfigWithFlattenInheritDefaults {
Expand Down
23 changes: 19 additions & 4 deletions core_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ struct Field {
enum Action {
Append(Field),
Merge(proc_macro2::TokenStream),
Wrapped(String, Vec<Action>),
}

fn value_from_actions(
Expand All @@ -159,18 +160,32 @@ fn value_from_actions(
}
}
}
Action::Wrapped(key, actions) => {
let actions = wrap_in_object(serde_json, &value_from_actions(serde_json, actions));
quote! {
dict.insert(#key.to_string(), #actions);
}
}
})
.collect();

quote! {
{
let mut dict = #serde_json::Map::new();
#(#stmts)*
#serde_json::Value::Object(dict)
{
let mut dict = #serde_json::Map::new();
#(#stmts)*
dict
}
}
}
}

fn wrap_in_object(serde_json: &Path, dict: &proc_macro2::TokenStream) -> proc_macro2::TokenStream {
quote! {
#serde_json::Value::Object(#dict)
}
}

/// Example of user-defined [derive mode macro][1]
///
/// [1]: https://doc.rust-lang.org/reference/procedural-macros.html#derive-mode-macros
Expand Down Expand Up @@ -244,7 +259,7 @@ pub fn derive_config(input: TokenStream) -> TokenStream {
)
.collect();

let value = value_from_actions(&serde_json, actions);
let value = wrap_in_object(&serde_json, &value_from_actions(&serde_json, actions));

let struct_ident = input.ident;
let tokens = quote! {
Expand Down
1 change: 1 addition & 0 deletions doc/src/arch.drawio
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
<mxfile host="drawio-plugin" modified="2022-06-29T15:53:39.812Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.45 Safari/537.36" etag="Ge7dQ61n4_Jv5_vZK8uo" version="15.5.4" type="embed"><diagram id="K8gbWWgciXM93NndIorP" name="Page-1"><mxGraphModel dx="987" dy="788" grid="1" gridSize="10" guides="1" tooltips="1" connect="1" arrows="1" fold="1" page="1" pageScale="1" pageWidth="1200" pageHeight="1920" background="#FFFFFF" math="0" shadow="0"><root><mxCell id="0"/><mxCell id="1" parent="0"/><mxCell id="92" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.25;entryDx=0;entryDy=0;strokeColor=#000000;" parent="1" source="2" target="5" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="2" value="Twitter" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=none;fontColor=#000000;" parent="1" vertex="1"><mxGeometry x="875.88" y="100" width="120" height="40" as="geometry"/></mxCell><mxCell id="93" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.5;entryDx=0;entryDy=0;strokeColor=#000000;" parent="1" source="3" target="5" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="3" value="Youtube" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=none;fontColor=#000000;" parent="1" vertex="1"><mxGeometry x="875.88" y="165" width="120" height="40" as="geometry"/></mxCell><mxCell id="91" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0;exitY=0.5;exitDx=0;exitDy=0;entryX=1;entryY=0.75;entryDx=0;entryDy=0;strokeColor=#000000;" parent="1" source="4" target="5" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="4" value="Bilibili" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#e1d5e7;strokeColor=none;fontColor=#000000;" parent="1" vertex="1"><mxGeometry x="875.88" y="230" width="120" height="40" as="geometry"/></mxCell><mxCell id="32" style="edgeStyle=none;html=1;entryX=0.5;entryY=0;entryDx=0;entryDy=0;exitX=0.5;exitY=1;exitDx=0;exitDy=0;exitPerimeter=0;strokeColor=#000000;" parent="1" source="96" target="27" edge="1"><mxGeometry relative="1" as="geometry"><mxPoint x="687.5055555555555" y="430" as="sourcePoint"/></mxGeometry></mxCell><mxCell id="25" style="edgeStyle=none;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=0.5;entryY=0;entryDx=0;entryDy=0;entryPerimeter=0;strokeColor=#000000;" parent="1" source="5" target="96" edge="1"><mxGeometry relative="1" as="geometry"><mxPoint x="687.5099999999998" y="350" as="targetPoint"/></mxGeometry></mxCell><mxCell id="87" value="Tasks" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=0;exitDx=0;exitDy=0;exitPerimeter=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;strokeColor=#000000;fontColor=#000000;labelBackgroundColor=#FFFFFF;" parent="1" source="34" target="5" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="34" value="MongoDB&lt;br&gt;Tasks" style="shape=datastore;whiteSpace=wrap;html=1;boundedLbl=1;backgroundOutline=1;size=15;fillColor=#fad7ac;strokeColor=#b46504;fontColor=#B64504;align=center;" parent="1" vertex="1"><mxGeometry x="195" y="320" width="90" height="80" as="geometry"/></mxCell><mxCell id="40" value="MongoDB&lt;br&gt;VTBs" style="shape=datastore;whiteSpace=wrap;html=1;boundedLbl=1;backgroundOutline=1;size=15;fillColor=#fad7ac;strokeColor=#b46504;fontColor=#B64504;" parent="1" vertex="1"><mxGeometry x="305" y="320" width="90" height="80" as="geometry"/></mxCell><mxCell id="53" style="edgeStyle=none;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;entryX=0;entryY=0.5;entryDx=0;entryDy=0;strokeColor=#000000;" parent="1" source="45" target="27" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="55" value="Add new resource (Task)" style="edgeStyle=none;html=1;exitX=0.25;exitY=0;exitDx=0;exitDy=0;entryX=0.5;entryY=1;entryDx=0;entryDy=0;entryPerimeter=0;strokeColor=#000000;fontColor=#000000;labelBackgroundColor=#FFFFFF;" parent="1" source="45" target="34" edge="1"><mxGeometry x="0.0301" y="-2" relative="1" as="geometry"><mxPoint as="offset"/></mxGeometry></mxCell><mxCell id="110" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=0.16;entryY=-0.06;entryDx=0;entryDy=0;entryPerimeter=0;labelBackgroundColor=none;fontColor=#000000;strokeColor=#000000;" edge="1" parent="1" source="45" target="48"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="45" value="API" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#ffcc99;strokeColor=none;fontColor=#000000;" parent="1" vertex="1"><mxGeometry x="290" y="505" width="120" height="60" as="geometry"/></mxCell><mxCell id="48" value="Users" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#fad9d5;strokeColor=none;fontColor=#000000;" parent="1" vertex="1"><mxGeometry x="255" y="650" width="595" height="50" as="geometry"/></mxCell><mxCell id="58" value="MongoDB&lt;br&gt;Users" style="shape=datastore;whiteSpace=wrap;html=1;boundedLbl=1;backgroundOutline=1;size=15;fillColor=#fad7ac;strokeColor=#b46504;fontColor=#B64504;" parent="1" vertex="1"><mxGeometry x="420" y="320" width="90" height="80" as="geometry"/></mxCell><mxCell id="63" value="" style="endArrow=classic;startArrow=classic;html=1;entryX=0.5;entryY=1;entryDx=0;entryDy=0;entryPerimeter=0;exitX=0.75;exitY=0;exitDx=0;exitDy=0;strokeColor=#000000;fontColor=#000000;labelBackgroundColor=none;" parent="1" source="45" target="58" edge="1"><mxGeometry width="50" height="50" relative="1" as="geometry"><mxPoint x="550" y="460" as="sourcePoint"/><mxPoint x="600" y="410" as="targetPoint"/></mxGeometry></mxCell><mxCell id="64" value="User config" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];labelBackgroundColor=#FFFFFF;fontColor=#000000;" parent="63" vertex="1" connectable="0"><mxGeometry x="-0.1135" y="-1" relative="1" as="geometry"><mxPoint x="-4" y="-9" as="offset"/></mxGeometry></mxCell><mxCell id="65" value="" style="endArrow=classic;startArrow=classic;html=1;entryX=0.5;entryY=1;entryDx=0;entryDy=0;entryPerimeter=0;exitX=0.5;exitY=0;exitDx=0;exitDy=0;strokeColor=#000000;" parent="1" source="45" target="40" edge="1"><mxGeometry width="50" height="50" relative="1" as="geometry"><mxPoint x="550" y="460" as="sourcePoint"/><mxPoint x="600" y="410" as="targetPoint"/></mxGeometry></mxCell><mxCell id="66" value="Retrieve and update VTB info" style="edgeLabel;html=1;align=center;verticalAlign=middle;resizable=0;points=[];fontColor=#000000;labelBackgroundColor=#FFFFFF;" parent="65" vertex="1" connectable="0"><mxGeometry x="0.619" y="1" relative="1" as="geometry"><mxPoint as="offset"/></mxGeometry></mxCell><mxCell id="79" value="" style="group" parent="1" vertex="1" connectable="0"><mxGeometry x="785" y="320" width="110" height="140" as="geometry"/></mxCell><mxCell id="67" value="" style="rounded=0;whiteSpace=wrap;html=1;dashed=1;fillColor=none;strokeColor=#000000;" parent="79" vertex="1"><mxGeometry width="110" height="140" as="geometry"/></mxCell><mxCell id="69" value="Translate" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#bac8d3;strokeColor=none;fontColor=#000000;" parent="79" vertex="1"><mxGeometry x="20" y="80" width="70" height="30" as="geometry"/></mxCell><mxCell id="70" value="MIddlewares" style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;fontColor=#000000;" parent="79" vertex="1"><mxGeometry x="15" y="10" width="80" height="20" as="geometry"/></mxCell><mxCell id="76" value="..." style="text;html=1;align=center;verticalAlign=middle;resizable=0;points=[];autosize=1;strokeColor=none;fillColor=none;" parent="79" vertex="1"><mxGeometry x="40" y="110" width="30" height="20" as="geometry"/></mxCell><mxCell id="68" value="Delay" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#bac8d3;strokeColor=none;fontColor=#000000;" parent="79" vertex="1"><mxGeometry x="20" y="40" width="70" height="30" as="geometry"/></mxCell><mxCell id="90" value="" style="group;fontColor=#000000;" parent="1" vertex="1" connectable="0"><mxGeometry x="607.51" y="70" width="160" height="230" as="geometry"/></mxCell><mxCell id="5" value="" style="rounded=0;whiteSpace=wrap;html=1;dashed=1;fillColor=none;strokeColor=#000000;" parent="90" vertex="1"><mxGeometry width="160" height="230" as="geometry"/></mxCell><mxCell id="6" value="Twitter Worker" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#cce5ff;strokeColor=none;fontColor=#000000;" parent="90" vertex="1"><mxGeometry x="20" y="20" width="120" height="40" as="geometry"/></mxCell><mxCell id="9" value="Youtube Worker" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#cce5ff;strokeColor=none;fontColor=#000000;" parent="90" vertex="1"><mxGeometry x="20" y="70" width="120" height="40" as="geometry"/></mxCell><mxCell id="10" value="Bilibili Worker" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#cce5ff;strokeColor=none;fontColor=#000000;" parent="90" vertex="1"><mxGeometry x="20" y="120" width="120" height="40" as="geometry"/></mxCell><mxCell id="13" value="Worker Cluster" style="text;html=1;strokeColor=none;fillColor=none;align=center;verticalAlign=middle;whiteSpace=wrap;rounded=0;direction=south;fontColor=#000000;" parent="90" vertex="1"><mxGeometry x="20.16" y="180" width="119.67" height="40" as="geometry"/></mxCell><mxCell id="22" value="......" style="text;html=1;strokeColor=none;fillColor=none;align=center;verticalAlign=middle;whiteSpace=wrap;rounded=0;fontColor=#000000;" parent="90" vertex="1"><mxGeometry x="55.81796116504856" y="160" width="48.349514563106794" height="30" as="geometry"/></mxCell><mxCell id="95" value="" style="group" parent="1" vertex="1" connectable="0"><mxGeometry x="522.51" y="480" width="330" height="110" as="geometry"/></mxCell><mxCell id="27" value="" style="rounded=0;whiteSpace=wrap;html=1;dashed=1;fillColor=none;strokeColor=#000000;" parent="95" vertex="1"><mxGeometry width="330" height="110" as="geometry"/></mxCell><mxCell id="28" value="Telegram Bot" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#d0cee2;strokeColor=none;fontColor=#000000;" parent="95" vertex="1"><mxGeometry x="28.529999999999973" y="50" width="92.34" height="40" as="geometry"/></mxCell><mxCell id="29" value="QQ Bot" style="rounded=0;whiteSpace=wrap;html=1;fillColor=#d0cee2;strokeColor=none;fontColor=#000000;" parent="95" vertex="1"><mxGeometry x="142.21000000000004" y="50" width="92.34" height="40" as="geometry"/></mxCell><mxCell id="30" value="......" style="text;html=1;strokeColor=none;fillColor=none;align=center;verticalAlign=middle;whiteSpace=wrap;rounded=0;fontColor=#000000;" parent="95" vertex="1"><mxGeometry x="260.87796116504853" y="55" width="48.349514563106794" height="30" as="geometry"/></mxCell><mxCell id="31" value="IM Integrations" style="text;html=1;strokeColor=none;fillColor=none;align=center;verticalAlign=middle;whiteSpace=wrap;rounded=0;fontColor=#000000;" parent="95" vertex="1"><mxGeometry x="28.53" y="10" width="82.5" height="30" as="geometry"/></mxCell><mxCell id="96" value="Rabbit&lt;br&gt;MQ" style="shape=cylinder3;whiteSpace=wrap;html=1;boundedLbl=1;backgroundOutline=1;size=15;rounded=0;fontColor=#000000;strokeColor=#000000;fillColor=#99CCFF;" parent="1" vertex="1"><mxGeometry x="647.51" y="340" width="80" height="90" as="geometry"/></mxCell><mxCell id="103" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=1;exitY=0.5;exitDx=0;exitDy=0;fontFamily=Helvetica;fontSize=12;fontColor=#000000;strokeColor=#000000;" parent="1" source="101" target="67" edge="1"><mxGeometry relative="1" as="geometry"/></mxCell><mxCell id="101" value="λ" style="ellipse;whiteSpace=wrap;html=1;rounded=0;dashed=1;strokeColor=#36393d;fillColor=#ffcccc;align=center;fontColor=#000000;" parent="1" vertex="1"><mxGeometry x="720" y="380" width="20" height="20" as="geometry"/></mxCell><mxCell id="111" style="edgeStyle=orthogonalEdgeStyle;rounded=0;orthogonalLoop=1;jettySize=auto;html=1;exitX=0.5;exitY=1;exitDx=0;exitDy=0;entryX=0.728;entryY=-0.02;entryDx=0;entryDy=0;entryPerimeter=0;labelBackgroundColor=none;fontColor=#000000;strokeColor=#000000;" edge="1" parent="1" source="27" target="48"><mxGeometry relative="1" as="geometry"/></mxCell></root></mxGraphModel></diagram></mxfile>
3 changes: 3 additions & 0 deletions doc/src/arch.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 3 additions & 4 deletions doc/src/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@ develop and deploy.
The system is designed to be flexible and easy to extend, which composed of multiple modules. Include:

- A centralized MongoDB as shared data source
- A coordinator to distribute tasks
- A worker pool to execute tasks
- A worker cluster to execute tasks
- A message queue to collect and dispatch messages generated by workers
- Multiple middlewares to provide extra functionality
- Multiple clients(bots) to send the message to subscribers
- A Web UI for users to manage their subscribe preference
- An API to manage the system and provide functionality to frontend and bots

![2022-04-08T16:29:56Z.png](https://imagedelivery.net/b21oeeg7p6hqWEI-IA5xDw/9e103fdd-69d2-4662-01e7-e78a229f3800/public)
![arch](arch.svg)

[arch.drawio]: https://github.com/suisei-cn/stargazer-reborn/blob/master/arch.drawio
[arch.drawio]: arch.drawio
13 changes: 13 additions & 0 deletions doc/src/workers/index.md
Original file line number Diff line number Diff line change
@@ -1 +1,14 @@
# Workers

In stargazer-reborn, sources to be traced are called *tasks*.
A task can be a Twitter account, a YouTube channel, a Twitch channel, etc.
All tasks are stored in the `tasks` collection in MongoDB.

Each task is handled by a worker.
Say if there's a task of Twitter account `@suisei_hosimati`, the worker is responsible for fetching the latest tweets of this account, and pushing them to the message queue. And the relation between tasks and workers is one-to-many.

To handle a great amount of tasks, stargazer-reborn uses a worker cluster.
The cluster is heterogeneous in the sense that each worker can handle only one kind of task. However, every worker is *equivalent* if they are of the same kind, i.e. there's no "central" or "master" worker, nor any kind of "coordinator" node.
Each worker connects to the database, the single source of truth, and the message queue, on its own.

To evenly distribute the work load, each worker uses gossip-based SWIM protocol to discover the other workers in the cluster. After that, a consistent hash ring is built to determine the worker responsible for a given task.
4 changes: 2 additions & 2 deletions middlewares/delay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ edition = "2021"
[dependencies]
chrono = "0.4"
color-eyre = "0.6"
diesel = { version = "1.4", features = ["chrono", "sqlite", "r2d2"] }
diesel_migrations = "1.4"
diesel = { version = "2.0", features = ["chrono", "sqlite", "r2d2"] }
diesel_migrations = "2.0"
eyre = "0.6"
figment = { version = "0.10", features = ["env"] }
futures-util = { version = "0.3" }
Expand Down
Loading