-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds bacalhau support #1
base: main
Are you sure you want to change the base?
Conversation
Adds a destination node for bacalhau that is able to submit jobs to a local requester node, using the orchestrator API. Each instance of the destination node is configured with the location of the jobstore, a local directory containing .yaml files describing jobs. It should also be provided with the job name (one of the .yaml files) so that it might be configured with the following ``` jobstore: /tmp/jobstore job: process ``` and will load /tmp/jobstore/process.yaml as the job specification. Once loaded the job specification can be templated using handlebars syntax, so that {{key}} will be replaced with values from the 'key' field in the input message.
We really only expect one row in the incoming message and so we'll convert that to a string-string hashmap to pass as the arguments for the job submission. We do this in a roundabout route (dataframe->recordbatch->json->hashmap).
e0ee014
to
f715b73
Compare
WalkthroughThe recent updates introduce a new Changes
Recent Review DetailsConfiguration used: CodeRabbit UI Files ignored due to path filters (1)
Files selected for processing (14)
Files skipped from review due to trivial changes (1)
Additional comments not posted (13)
TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
impl JobStore { | ||
pub fn new(jobstore: impl Into<String>) -> Result<Self, StdError> { | ||
let path = PathBuf::from(jobstore.into()); | ||
let entries = path.read_dir()?; | ||
|
||
let mut handlebars = Handlebars::new(); | ||
for e in entries { | ||
let entry = e?; | ||
let meta = entry.metadata()?; | ||
if !meta.is_file() { | ||
continue; | ||
} | ||
|
||
let entryname = entry.file_name(); | ||
let fname = entryname.to_str().unwrap(); | ||
if fname.ends_with("yaml") || fname.ends_with("yml") { | ||
let p = entry.path(); | ||
let name = p.file_stem().unwrap(); | ||
|
||
handlebars.register_template_string( | ||
name.to_str().unwrap(), | ||
&*fs::read_to_string(entry.path()) | ||
.expect("should have been able to read the file"), | ||
)?; | ||
} | ||
} | ||
|
||
Ok(Self { | ||
templates: handlebars, | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider handling potential errors more gracefully in the new
method of JobStore
. Currently, the method uses unwrap
and expect
, which could lead to panics if the data is not as expected. It would be safer to return an error in these cases.
- let fname = entryname.to_str().unwrap();
- let name = p.file_stem().unwrap();
+ let fname = entryname.to_str().ok_or("Filename not valid UTF-8")?;
+ let name = p.file_stem().ok_or("File stem not available")?;
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
impl JobStore { | |
pub fn new(jobstore: impl Into<String>) -> Result<Self, StdError> { | |
let path = PathBuf::from(jobstore.into()); | |
let entries = path.read_dir()?; | |
let mut handlebars = Handlebars::new(); | |
for e in entries { | |
let entry = e?; | |
let meta = entry.metadata()?; | |
if !meta.is_file() { | |
continue; | |
} | |
let entryname = entry.file_name(); | |
let fname = entryname.to_str().unwrap(); | |
if fname.ends_with("yaml") || fname.ends_with("yml") { | |
let p = entry.path(); | |
let name = p.file_stem().unwrap(); | |
handlebars.register_template_string( | |
name.to_str().unwrap(), | |
&*fs::read_to_string(entry.path()) | |
.expect("should have been able to read the file"), | |
)?; | |
} | |
} | |
Ok(Self { | |
templates: handlebars, | |
}) | |
} | |
impl JobStore { | |
pub fn new(jobstore: impl Into<String>) -> Result<Self, StdError> { | |
let path = PathBuf::from(jobstore.into()); | |
let entries = path.read_dir()?; | |
let mut handlebars = Handlebars::new(); | |
for e in entries { | |
let entry = e?; | |
let meta = entry.metadata()?; | |
if !meta.is_file() { | |
continue; | |
} | |
let entryname = entry.file_name(); | |
let fname = entryname.to_str().ok_or("Filename not valid UTF-8")?; | |
if fname.ends_with("yaml") || fname.ends_with("yml") { | |
let p = entry.path(); | |
let name = p.file_stem().ok_or("File stem not available")?; | |
handlebars.register_template_string( | |
name.to_str().unwrap(), | |
&*fs::read_to_string(entry.path()) | |
.expect("should have been able to read the file"), | |
)?; | |
} | |
} | |
Ok(Self { | |
templates: handlebars, | |
}) | |
} | |
} |
pub fn render(&self, name: String, data: &HashMap<String, String>) -> Result<String, StdError> { | ||
let text = self.templates.render(&name, &json!(data))?; | ||
let job: Value = serde_yaml::from_str(&text).unwrap(); | ||
|
||
let request = PutJobRequest { | ||
job, | ||
idempotency_token: None, | ||
namespace: None, | ||
headers: None, | ||
}; | ||
|
||
Ok(serde_json::to_string(&request).unwrap()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the render
method of JobStore
, consider handling the potential error from YAML parsing instead of using unwrap
. This change would make the method more robust by properly handling cases where the rendered template is not valid YAML.
- let job: Value = serde_yaml::from_str(&text).unwrap();
+ let job: Value = serde_yaml::from_str(&text).map_err(|e| StdError::from(e))?;
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
pub fn render(&self, name: String, data: &HashMap<String, String>) -> Result<String, StdError> { | |
let text = self.templates.render(&name, &json!(data))?; | |
let job: Value = serde_yaml::from_str(&text).unwrap(); | |
let request = PutJobRequest { | |
job, | |
idempotency_token: None, | |
namespace: None, | |
headers: None, | |
}; | |
Ok(serde_json::to_string(&request).unwrap()) | |
} | |
pub fn render(&self, name: String, data: &HashMap<String, String>) -> Result<String, StdError> { | |
let text = self.templates.render(&name, &json!(data))?; | |
let job: Value = serde_yaml::from_str(&text).map_err(|e| StdError::from(e))?; | |
let request = PutJobRequest { | |
job, | |
idempotency_token: None, | |
namespace: None, | |
headers: None, | |
}; | |
Ok(serde_json::to_string(&request).unwrap()) | |
} |
impl Bacalhau { | ||
pub fn new(job: impl Into<String>, jobstore: impl Into<String>) -> Self { | ||
Self { | ||
job: job.into(), | ||
jobstore: JobStore::new(jobstore).expect("should be able to create jobstore"), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the new
method of Bacalhau
, consider modifying the method to return an error instead of using expect
. This change would make the method more robust by properly handling cases where the jobstore cannot be created.
- jobstore: JobStore::new(jobstore).expect("should be able to create jobstore"),
+ jobstore: JobStore::new(jobstore)?,
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
impl Bacalhau { | |
pub fn new(job: impl Into<String>, jobstore: impl Into<String>) -> Self { | |
Self { | |
job: job.into(), | |
jobstore: JobStore::new(jobstore).expect("should be able to create jobstore"), | |
} | |
impl Bacalhau { | |
pub fn new(job: impl Into<String>, jobstore: impl Into<String>) -> Self { | |
Self { | |
job: job.into(), | |
jobstore: JobStore::new(jobstore)?, | |
} |
Adds a destination node for bacalhau that is able to submit jobs to a local requester node, using the orchestrator API.
Each instance of the destination node is configured with the location of the jobstore, a local directory containing .yaml files describing jobs. It should also be provided with the job name (one of the .yaml files) so that it might be configured with the following
and will load /tmp/jobstore/process.yaml as the job specification. Once loaded the job specification can be templated using handlebars syntax, so that {{key}} will be replaced with values from the 'key' field in the input message.
TODO
Summary by CodeRabbit
New Features
Documentation
Refactor
Chores
.gitignore
to exclude the.vscode/
directory.