diff --git a/Cargo.lock b/Cargo.lock index b78638fd..4fb9e34f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -682,6 +682,9 @@ dependencies = [ name = "bacalhau" version = "0.1.0" dependencies = [ + "arrow 49.0.0", + "arrow-json 49.0.0", + "arrow_msg", "handlebars", "reqwest", "section", diff --git a/pipe/section/section_impls/bacalhau/Cargo.toml b/pipe/section/section_impls/bacalhau/Cargo.toml index 6b850667..7a831fb8 100644 --- a/pipe/section/section_impls/bacalhau/Cargo.toml +++ b/pipe/section/section_impls/bacalhau/Cargo.toml @@ -6,6 +6,9 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arrow = { version = "49", features = ["prettyprint"] } +arrow-json = "49" +arrow_msg = { path = "../../../arrow_msg" } section = { version = "0.2", path = "../../../section/" } serde = { version = "1", features = ["derive"] } serde_json = "1" @@ -16,5 +19,6 @@ tokio-util = "0.7.10" handlebars = "4.4" serde_yaml = "0.9" + [dev-dependencies] stub = { path = "../../section_impls/stub/" } diff --git a/pipe/section/section_impls/bacalhau/src/destination.rs b/pipe/section/section_impls/bacalhau/src/destination.rs index d2c463eb..14e769de 100644 --- a/pipe/section/section_impls/bacalhau/src/destination.rs +++ b/pipe/section/section_impls/bacalhau/src/destination.rs @@ -1,19 +1,19 @@ -use crate::jobstore::JobStore; -use crate::{api::submit, BacalhauPayload}; - -use section::message::ValueView; +use arrow_msg::df_to_recordbatch; use section::{ command_channel::{Command, SectionChannel}, - futures::{self, FutureExt, Sink, SinkExt, Stream, StreamExt}, + futures::{self, FutureExt, Sink, Stream, StreamExt}, message::Chunk, pretty_print::pretty_print, section::Section, SectionError, SectionMessage, }; + use std::collections::HashMap; +use std::future::Future; use std::pin::{pin, Pin}; -use std::future::Future; +use crate::api::submit; +use crate::jobstore::JobStore; type StdError = Box; @@ -76,24 +76,17 @@ impl Bacalhau { Some(_ch) => continue, }; - - let columns = frame.columns(); - - section_channel.log(format!("got dataframe chunk from {}:\n{}", msg.origin(), pretty_print(&*frame))).await?; - let mut c = columns.into_iter().nth(0).unwrap(); - match c.nth(0).unwrap() { - ValueView::Str(s) => { - section_channel.log(format!("got string: {}", s)).await?; - } - _ => continue, - } + let batch = df_to_recordbatch(frame).unwrap(); + let json_rows = arrow_json::writer::record_batches_to_json_rows(&[&batch]).unwrap(); + let first = json_rows.first().unwrap(); + let args: HashMap = first.iter().map(|(k, v)| { + (k.clone(), v.as_str().unwrap().to_string()) + }).collect(); - //ValueView::Str(child.as_string::().value(value_offset)), - let args: HashMap = HashMap::new(); - //args.insert("target".into(), "".into()); + section_channel.log(format!("Arguments: {:?}", &args)).await?; self.submit_job(&args).await?; msg.ack().await; diff --git a/pipe/section/section_impls/bacalhau/src/lib.rs b/pipe/section/section_impls/bacalhau/src/lib.rs index f8aa98a3..7c32ffbc 100644 --- a/pipe/section/section_impls/bacalhau/src/lib.rs +++ b/pipe/section/section_impls/bacalhau/src/lib.rs @@ -1,69 +1,5 @@ -use std::sync::Arc; - -use section::message::{Ack, Chunk, Column, DataFrame, DataType, Message, ValueView}; pub mod api; pub mod destination; pub mod jobstore; type StdError = Box; - -#[derive(Debug, Clone, PartialEq)] -pub struct BacalhauPayload { - pub filepath: String, -} - -impl BacalhauPayload { - fn new(filepath: String) -> Self { - Self { filepath } - } -} - -impl DataFrame for BacalhauPayload { - fn columns(&self) -> Vec> { - vec![Column::new( - "filepath", - DataType::Str, - Box::new(std::iter::once(ValueView::from(&self.filepath))), - )] - } -} - -pub struct BacalhauMessage { - origin: Arc, - payload: Option>, - ack: Option, -} - -impl BacalhauMessage { - fn new(origin: Arc, payload: impl DataFrame, ack: Option) -> Self { - Self { - origin, - payload: Some(Box::new(payload)), - ack, - } - } -} - -impl std::fmt::Debug for BacalhauMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("BacalhauMessage") - .field("origin", &self.origin) - .field("payload", &self.payload) - .finish() - } -} - -impl Message for BacalhauMessage { - fn origin(&self) -> &str { - self.origin.as_ref() - } - - fn next(&mut self) -> section::message::Next<'_> { - let v = self.payload.take().map(Chunk::DataFrame); - Box::pin(async move { Ok(v) }) - } - - fn ack(&mut self) -> Ack { - self.ack.take().unwrap_or(Box::pin(async {})) - } -}