Skip to content

Commit f2c40ad

Browse files
committed
Adds bacalhau support
Adds a destination node for bacalhau that is able to submit jobs to a local requester node, using the orchestrator API. Each instance of the destination node is configured with the location of the jobstore, a local directory containing .yaml files describing jobs. It should also be provided with the job name (one of the .yaml files) so that it might be configured with the following ``` jobstore: /tmp/jobstore job: process ``` and will load /tmp/jobstore/process.yaml as the job specification. Once loaded the job specification can be templated using handlebars syntax, so that {{key}} will be replaced with values from the 'key' field in the input message.
1 parent e00581e commit f2c40ad

File tree

17 files changed

+659
-15
lines changed

17 files changed

+659
-15
lines changed

.gitignore

+2
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ Cargo.lock
1212

1313
# MSVC Windows builds of rustc generate these, which store debugging information
1414
*.pdb
15+
16+
.vscode/

Cargo.lock

+101
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ members = [
66
"pipe/runtime",
77
"pipe/arrow_msg",
88
"pipe/section",
9+
"pipe/section/section_impls/bacalhau",
910
"pipe/section/section_impls/hello_world",
1011
"pipe/section/section_impls/sqlite_connector",
1112
"pipe/section/section_impls/excel_connector",

common/src/lib.rs

+9
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub enum Destination {
5757
Sqlite_Connector(SqliteDestinationConfig),
5858
Snowflake(SnowflakeDestinationConfig),
5959
Sqlite_Physical_Replication(SqlitePhysicalReplicationDestinationConfig),
60+
Bacalhau(BacalhauDestinationConfig),
6061
Hello_World(HelloWorldDestinationConfig),
6162
Kafka(KafkaDestinationConfig),
6263
Postgres_Connector(PostgresConnectorDestinationConfig),
@@ -149,6 +150,14 @@ pub struct SqlitePhysicalReplicationSourceConfig {
149150
// database path
150151
}
151152

153+
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
154+
pub struct BacalhauDestinationConfig {
155+
#[serde(flatten)]
156+
pub common_attrs: CommonAttrs,
157+
pub job: String,
158+
pub jobstore: String,
159+
}
160+
152161
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
153162
pub struct HelloWorldSourceConfig {
154163
#[serde(flatten)]

console/components/Flow/index.tsx

+11-10
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import {
4040
KafkaSourceNode,
4141
SnowflakeSourceNode,
4242
SnowflakeDestinationNode,
43+
BacalhauDestinationNode,
4344
HelloWorldSourceNode,
4445
HelloWorldDestinationNode,
4546
ExcelSourceNode,
@@ -75,11 +76,10 @@ const useStyles = createStyles((theme) => ({
7576
marginBottom: theme.spacing.md,
7677

7778
"&:not(:last-of-type)": {
78-
borderBottom: `${rem(1)} solid ${
79-
theme.colorScheme === "dark"
80-
? theme.colors.dark[4]
81-
: theme.colors.gray[3]
82-
}`,
79+
borderBottom: `${rem(1)} solid ${theme.colorScheme === "dark"
80+
? theme.colors.dark[4]
81+
: theme.colors.gray[3]
82+
}`,
8383
},
8484
},
8585

@@ -186,6 +186,7 @@ const nodeTypes = {
186186
snowflake_destination: SnowflakeDestinationNode,
187187
hello_world_source: HelloWorldSourceNode,
188188
hello_world_destination: HelloWorldDestinationNode,
189+
bacalhau_destination: BacalhauDestinationNode,
189190
postgres_connector_source: PostgresConnectorSourceNode,
190191
postgres_connector_destination: PostgresConnectorDestinationNode,
191192
mysql_connector_destination: MysqlConnectorDestinationNode,
@@ -286,7 +287,7 @@ function NavbarSearch(props: NavbarSearchProps) {
286287
onDragStart={(event) => onDragStart(event, null, source, source)}
287288
draggable
288289
>
289-
Mycelial Server
290+
Mycelial Server
290291
</div>,
291292
];
292293
};
@@ -345,7 +346,7 @@ async function getConfigs(token: string) {
345346
});
346347
const result = await response.json();
347348
return result;
348-
} catch (error) {}
349+
} catch (error) { }
349350
}
350351

351352
const dagreGraph = new dagre.graphlib.Graph();
@@ -568,7 +569,7 @@ function Flow() {
568569

569570
if (edge.data?.id) {
570571
let payload = {
571-
configs: [{id: edge.data.id, pipe: pipe}]
572+
configs: [{ id: edge.data.id, pipe: pipe }]
572573
}
573574
try {
574575
const response = await fetch("/api/pipe", {
@@ -586,7 +587,7 @@ function Flow() {
586587
}
587588
} else {
588589
let payload = {
589-
configs: [{pipe: pipe}]
590+
configs: [{ pipe: pipe }]
590591
}
591592
const response = await fetch("/api/pipe", {
592593
method: "POST",
@@ -609,7 +610,7 @@ function Flow() {
609610
id: id,
610611
},
611612
}
612-
}
613+
}
613614
return ed;
614615
});
615616
})

console/components/nodes.tsx

+104-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,107 @@ import styles from "@/components/Flow/Flow.module.css";
1616
import { ClientContext } from "./context/clientContext";
1717
import { ClientContextType } from "./@types/client";
1818

19+
20+
const BacalhauDestinationNode: FC<NodeProps> = memo(({ id, data, selected }) => {
21+
const instance = useReactFlow();
22+
const { clients } = useContext(ClientContext) as ClientContextType;
23+
24+
let initialValues = useMemo(() => {
25+
return {
26+
client: data.client ? data.client : "-",
27+
job: data.job ? data.job : "job",
28+
jobstore: data.jobstore ? data.jobstore : "jobstore",
29+
};
30+
}, []);
31+
32+
const handleChange = useCallback((name: string, value: string) => {
33+
instance.setNodes((nodes) =>
34+
nodes.map((node) => {
35+
if (node.id === id) {
36+
node.data = {
37+
...node.data,
38+
[name]: value,
39+
};
40+
}
41+
42+
return node;
43+
}),
44+
);
45+
}, []);
46+
47+
useEffect(() => {
48+
handleChange("client", initialValues.client);
49+
handleChange("job", initialValues.job);
50+
handleChange("jobstore", initialValues.jobstore);
51+
}, []);
52+
53+
let classNames = `${styles.customNode} `;
54+
if (selected) {
55+
classNames = classNames + `${styles.selected}`;
56+
}
57+
58+
const removeNode = useCallback((id: string) => {
59+
const node = instance.getNode(id);
60+
if (node === undefined) {
61+
return;
62+
}
63+
instance.deleteElements({
64+
edges: getConnectedEdges([node], []),
65+
nodes: [node],
66+
});
67+
}, []);
68+
69+
return (
70+
<div className={classNames}>
71+
<div className=" grid grid-cols-1 gap-x-6 gap-y-2">
72+
<h2 className="text-slate-400 font-normal">HelloWorld Destination</h2>
73+
<button
74+
onClick={() => {
75+
if (confirm("Are you sure you want to delete this node?")) {
76+
removeNode(id);
77+
}
78+
}}
79+
type="button"
80+
className="absolute right-1 top-1 rounded bg-red-200 text-white shadow-sm hover:bg-red-600 focus-visible:outline focus-visible:outline-2 focus-visible:outline-offset-2 focus-visible:outline-red-800"
81+
title="delete"
82+
>
83+
<XMarkIcon className="h-5 w-5" aria-hidden="true" />
84+
</button>
85+
<Select
86+
name="client"
87+
label="Client"
88+
placeholder="Pick one"
89+
defaultValue={initialValues.client}
90+
options={(clients || []).map((c) => c.id)}
91+
onChange={(value) => {
92+
handleChange("client", value || "");
93+
}}
94+
/>
95+
<TextInput
96+
name="jobstore"
97+
label="Job Store"
98+
placeholder={initialValues.jobstore}
99+
defaultValue={initialValues.jobstore}
100+
onChange={(event) => {
101+
handleChange("jobstore", event.currentTarget.value)
102+
}}
103+
/>
104+
<TextInput
105+
name="job"
106+
label="Job"
107+
placeholder={initialValues.job}
108+
defaultValue={initialValues.job}
109+
onChange={(event) => {
110+
handleChange("job", event.currentTarget.value)
111+
}}
112+
/>
113+
114+
<Handle type="target" position={Position.Left} id={id} />
115+
</div>
116+
</div>
117+
);
118+
});
119+
19120
const HelloWorldDestinationNode: FC<NodeProps> = memo(({ id, data, selected }) => {
20121
const instance = useReactFlow();
21122
const { clients } = useContext(ClientContext) as ClientContextType;
@@ -1401,7 +1502,7 @@ const PostgresConnectorDestinationNode: FC<NodeProps> = memo(({ id, data, select
14011502

14021503
let initialValues = useMemo(() => {
14031504
return {
1404-
url: data.url? data.url: "postgres://root:[email protected]:5432/test",
1505+
url: data.url ? data.url : "postgres://root:[email protected]:5432/test",
14051506
client: data.client ? data.client : "-",
14061507
};
14071508
}, []);
@@ -1485,7 +1586,7 @@ const MysqlConnectorDestinationNode: FC<NodeProps> = memo(({ id, data, selected
14851586

14861587
let initialValues = useMemo(() => {
14871588
return {
1488-
url: data.url? data.url: "mysql://root:[email protected]:3306/test",
1589+
url: data.url ? data.url : "mysql://root:[email protected]:3306/test",
14891590
client: data.client ? data.client : "-",
14901591
};
14911592
}, []);
@@ -1745,6 +1846,7 @@ export {
17451846
KafkaDestination,
17461847
SnowflakeSourceNode,
17471848
SnowflakeDestinationNode,
1849+
BacalhauDestinationNode,
17481850
HelloWorldSourceNode,
17491851
HelloWorldDestinationNode,
17501852
PostgresConnectorSourceNode,

0 commit comments

Comments
 (0)