From f2c40ad35234b1a46dda8cdc8ce0f197a2e9eeae Mon Sep 17 00:00:00 2001 From: Ross Jones Date: Fri, 12 Jan 2024 12:43:46 +0000 Subject: [PATCH 1/4] Adds bacalhau support Adds a destination node for bacalhau that is able to submit jobs to a local requester node, using the orchestrator API. Each instance of the destination node is configured with the location of the jobstore, a local directory containing .yaml files describing jobs. It should also be provided with the job name (one of the .yaml files) so that it might be configured with the following ``` jobstore: /tmp/jobstore job: process ``` and will load /tmp/jobstore/process.yaml as the job specification. Once loaded the job specification can be templated using handlebars syntax, so that {{key}} will be replaced with values from the 'key' field in the input message. --- .gitignore | 2 + Cargo.lock | 101 +++++++++++++ Cargo.toml | 1 + common/src/lib.rs | 9 ++ console/components/Flow/index.tsx | 21 +-- console/components/nodes.tsx | 106 +++++++++++++- myceliald/Cargo.toml | 7 +- myceliald/config.example.toml | 7 + myceliald/src/constructors/bacalhau.rs | 28 ++++ myceliald/src/constructors/mod.rs | 1 + myceliald/src/runtime.rs | 4 + .../section/section_impls/bacalhau/Cargo.toml | 20 +++ .../section/section_impls/bacalhau/src/api.rs | 49 +++++++ .../section_impls/bacalhau/src/destination.rs | 136 ++++++++++++++++++ .../section_impls/bacalhau/src/jobstore.rs | 103 +++++++++++++ .../section/section_impls/bacalhau/src/lib.rs | 69 +++++++++ .../bacalhau/testdata/process.yaml | 10 ++ 17 files changed, 659 insertions(+), 15 deletions(-) create mode 100644 myceliald/src/constructors/bacalhau.rs create mode 100644 pipe/section/section_impls/bacalhau/Cargo.toml create mode 100644 pipe/section/section_impls/bacalhau/src/api.rs create mode 100644 pipe/section/section_impls/bacalhau/src/destination.rs create mode 100644 pipe/section/section_impls/bacalhau/src/jobstore.rs create mode 100644 pipe/section/section_impls/bacalhau/src/lib.rs create mode 100644 pipe/section/section_impls/bacalhau/testdata/process.yaml diff --git a/.gitignore b/.gitignore index 6985cf1b..5ea5efc1 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,5 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +.vscode/ diff --git a/Cargo.lock b/Cargo.lock index f412dbf1..371a2b89 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -702,6 +702,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "bacalhau" +version = "0.1.0" +dependencies = [ + "handlebars", + "reqwest", + "section", + "serde", + "serde_json", + "serde_yaml", + "stub", + "tokio", + "tokio-stream", + "tokio-util", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -1547,6 +1563,20 @@ dependencies = [ "num-traits", ] +[[package]] +name = "handlebars" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faa67bab9ff362228eb3d00bd024a4965d8231bbb7921167f0cfa66c6626b225" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2175,6 +2205,7 @@ name = "myceliald" version = "0.2.0" dependencies = [ "anyhow", + "bacalhau", "base64 0.21.5", "clap", "common", @@ -2602,6 +2633,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f200d8d83c44a45b21764d1916299752ca035d15ecd46faca3e9a2a2bf6ad06" +dependencies = [ + "memchr", + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcd6ab1236bbdb3a49027e920e693192ebfe8913f6d60e294de57463a493cfde" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a31940305ffc96863a735bef7c7994a00b325a7138fdbc5bda0f1a0476d3275" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.48", +] + +[[package]] +name = "pest_meta" +version = "2.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7ff62f5259e53b78d1af898941cdcdccfae7385cf7d793a6e55de5d05bb4b7d" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.3" @@ -3392,6 +3468,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1bf28c79a99f70ee1f1d83d10c875d2e70618417fda01ad1785e027579d9d38" +dependencies = [ + "indexmap 2.1.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "server" version = "0.2.0" @@ -4321,6 +4410,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + [[package]] name = "unicase" version = "2.7.0" @@ -4369,6 +4464,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unsafe-libyaml" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" + [[package]] name = "untrusted" version = "0.7.1" diff --git a/Cargo.toml b/Cargo.toml index 2d3cf265..b30f5b34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "pipe/runtime", "pipe/arrow_msg", "pipe/section", + "pipe/section/section_impls/bacalhau", "pipe/section/section_impls/hello_world", "pipe/section/section_impls/sqlite_connector", "pipe/section/section_impls/excel_connector", diff --git a/common/src/lib.rs b/common/src/lib.rs index de7e9bfb..12192772 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -57,6 +57,7 @@ pub enum Destination { Sqlite_Connector(SqliteDestinationConfig), Snowflake(SnowflakeDestinationConfig), Sqlite_Physical_Replication(SqlitePhysicalReplicationDestinationConfig), + Bacalhau(BacalhauDestinationConfig), Hello_World(HelloWorldDestinationConfig), Kafka(KafkaDestinationConfig), Postgres_Connector(PostgresConnectorDestinationConfig), @@ -149,6 +150,14 @@ pub struct SqlitePhysicalReplicationSourceConfig { // database path } +#[derive(Serialize, Deserialize, Debug, Clone, Default)] +pub struct BacalhauDestinationConfig { + #[serde(flatten)] + pub common_attrs: CommonAttrs, + pub job: String, + pub jobstore: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct HelloWorldSourceConfig { #[serde(flatten)] diff --git a/console/components/Flow/index.tsx b/console/components/Flow/index.tsx index 88c1f0fe..aed0a4ef 100644 --- a/console/components/Flow/index.tsx +++ b/console/components/Flow/index.tsx @@ -40,6 +40,7 @@ import { KafkaSourceNode, SnowflakeSourceNode, SnowflakeDestinationNode, + BacalhauDestinationNode, HelloWorldSourceNode, HelloWorldDestinationNode, ExcelSourceNode, @@ -75,11 +76,10 @@ const useStyles = createStyles((theme) => ({ marginBottom: theme.spacing.md, "&:not(:last-of-type)": { - borderBottom: `${rem(1)} solid ${ - theme.colorScheme === "dark" - ? theme.colors.dark[4] - : theme.colors.gray[3] - }`, + borderBottom: `${rem(1)} solid ${theme.colorScheme === "dark" + ? theme.colors.dark[4] + : theme.colors.gray[3] + }`, }, }, @@ -186,6 +186,7 @@ const nodeTypes = { snowflake_destination: SnowflakeDestinationNode, hello_world_source: HelloWorldSourceNode, hello_world_destination: HelloWorldDestinationNode, + bacalhau_destination: BacalhauDestinationNode, postgres_connector_source: PostgresConnectorSourceNode, postgres_connector_destination: PostgresConnectorDestinationNode, mysql_connector_destination: MysqlConnectorDestinationNode, @@ -286,7 +287,7 @@ function NavbarSearch(props: NavbarSearchProps) { onDragStart={(event) => onDragStart(event, null, source, source)} draggable > - Mycelial Server + Mycelial Server , ]; }; @@ -345,7 +346,7 @@ async function getConfigs(token: string) { }); const result = await response.json(); return result; - } catch (error) {} + } catch (error) { } } const dagreGraph = new dagre.graphlib.Graph(); @@ -568,7 +569,7 @@ function Flow() { if (edge.data?.id) { let payload = { - configs: [{id: edge.data.id, pipe: pipe}] + configs: [{ id: edge.data.id, pipe: pipe }] } try { const response = await fetch("/api/pipe", { @@ -586,7 +587,7 @@ function Flow() { } } else { let payload = { - configs: [{pipe: pipe}] + configs: [{ pipe: pipe }] } const response = await fetch("/api/pipe", { method: "POST", @@ -609,7 +610,7 @@ function Flow() { id: id, }, } - } + } return ed; }); }) diff --git a/console/components/nodes.tsx b/console/components/nodes.tsx index 49b91294..f7bf9c73 100644 --- a/console/components/nodes.tsx +++ b/console/components/nodes.tsx @@ -16,6 +16,107 @@ import styles from "@/components/Flow/Flow.module.css"; import { ClientContext } from "./context/clientContext"; import { ClientContextType } from "./@types/client"; + +const BacalhauDestinationNode: FC = memo(({ id, data, selected }) => { + const instance = useReactFlow(); + const { clients } = useContext(ClientContext) as ClientContextType; + + let initialValues = useMemo(() => { + return { + client: data.client ? data.client : "-", + job: data.job ? data.job : "job", + jobstore: data.jobstore ? data.jobstore : "jobstore", + }; + }, []); + + const handleChange = useCallback((name: string, value: string) => { + instance.setNodes((nodes) => + nodes.map((node) => { + if (node.id === id) { + node.data = { + ...node.data, + [name]: value, + }; + } + + return node; + }), + ); + }, []); + + useEffect(() => { + handleChange("client", initialValues.client); + handleChange("job", initialValues.job); + handleChange("jobstore", initialValues.jobstore); + }, []); + + let classNames = `${styles.customNode} `; + if (selected) { + classNames = classNames + `${styles.selected}`; + } + + const removeNode = useCallback((id: string) => { + const node = instance.getNode(id); + if (node === undefined) { + return; + } + instance.deleteElements({ + edges: getConnectedEdges([node], []), + nodes: [node], + }); + }, []); + + return ( +
+
+

HelloWorld Destination

+ +