Skip to content

Commit

Permalink
Translate incoming message to hashmap
Browse files Browse the repository at this point in the history
We really only expect one row in the incoming message and so we'll
convert that to a string-string hashmap to pass as the arguments for
the job submission.  We do this in a roundabout route
(dataframe->recordbatch->json->hashmap).
  • Loading branch information
rossjones committed Jan 12, 2024
1 parent fc8df54 commit e0ee014
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 84 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pipe/section/section_impls/bacalhau/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { version = "49", features = ["prettyprint"] }
arrow-json = "49"
arrow_msg = { path = "../../../arrow_msg" }
section = { version = "0.2", path = "../../../section/" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand All @@ -16,5 +19,6 @@ tokio-util = "0.7.10"
handlebars = "4.4"
serde_yaml = "0.9"


[dev-dependencies]
stub = { path = "../../section_impls/stub/" }
33 changes: 13 additions & 20 deletions pipe/section/section_impls/bacalhau/src/destination.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
use crate::jobstore::JobStore;
use crate::{api::submit, BacalhauPayload};

use section::message::ValueView;
use arrow_msg::df_to_recordbatch;
use section::{
command_channel::{Command, SectionChannel},
futures::{self, FutureExt, Sink, SinkExt, Stream, StreamExt},
futures::{self, FutureExt, Sink, Stream, StreamExt},
message::Chunk,
pretty_print::pretty_print,
section::Section,
SectionError, SectionMessage,
};

use std::collections::HashMap;
use std::future::Future;
use std::pin::{pin, Pin};

use std::future::Future;
use crate::api::submit;
use crate::jobstore::JobStore;

type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;

Expand Down Expand Up @@ -76,24 +76,17 @@ impl Bacalhau {
Some(_ch) => continue,
};


let columns = frame.columns();


section_channel.log(format!("got dataframe chunk from {}:\n{}", msg.origin(), pretty_print(&*frame))).await?;

let mut c = columns.into_iter().nth(0).unwrap();
match c.nth(0).unwrap() {
ValueView::Str(s) => {
section_channel.log(format!("got string: {}", s)).await?;
}
_ => continue,
}

let batch = df_to_recordbatch(frame).unwrap();
let json_rows = arrow_json::writer::record_batches_to_json_rows(&[&batch]).unwrap();
let first = json_rows.first().unwrap();
let args: HashMap<String, String> = first.iter().map(|(k, v)| {
(k.clone(), v.as_str().unwrap().to_string())
}).collect();

//ValueView::Str(child.as_string::<i32>().value(value_offset)),
let args: HashMap<String, String> = HashMap::new();
//args.insert("target".into(), "".into());
section_channel.log(format!("Arguments: {:?}", &args)).await?;
self.submit_job(&args).await?;

msg.ack().await;
Expand Down
64 changes: 0 additions & 64 deletions pipe/section/section_impls/bacalhau/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,69 +1,5 @@
use std::sync::Arc;

use section::message::{Ack, Chunk, Column, DataFrame, DataType, Message, ValueView};
pub mod api;
pub mod destination;
pub mod jobstore;

type StdError = Box<dyn std::error::Error + Send + Sync + 'static>;

#[derive(Debug, Clone, PartialEq)]
pub struct BacalhauPayload {
pub filepath: String,
}

impl BacalhauPayload {
fn new(filepath: String) -> Self {
Self { filepath }
}
}

impl DataFrame for BacalhauPayload {
fn columns(&self) -> Vec<section::message::Column<'_>> {
vec![Column::new(
"filepath",
DataType::Str,
Box::new(std::iter::once(ValueView::from(&self.filepath))),
)]
}
}

pub struct BacalhauMessage {
origin: Arc<str>,
payload: Option<Box<dyn DataFrame>>,
ack: Option<Ack>,
}

impl BacalhauMessage {
fn new(origin: Arc<str>, payload: impl DataFrame, ack: Option<Ack>) -> Self {
Self {
origin,
payload: Some(Box::new(payload)),
ack,
}
}
}

impl std::fmt::Debug for BacalhauMessage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BacalhauMessage")
.field("origin", &self.origin)
.field("payload", &self.payload)
.finish()
}
}

impl Message for BacalhauMessage {
fn origin(&self) -> &str {
self.origin.as_ref()
}

fn next(&mut self) -> section::message::Next<'_> {
let v = self.payload.take().map(Chunk::DataFrame);
Box::pin(async move { Ok(v) })
}

fn ack(&mut self) -> Ack {
self.ack.take().unwrap_or(Box::pin(async {}))
}
}

0 comments on commit e0ee014

Please sign in to comment.