Conversation
There was a problem hiding this comment.
Pull request overview
This PR implements a cron adapter for the Draco protocol adapter system, enabling scheduled task execution based on cron expressions.
Key Changes:
- Added new cron adapter that schedules and executes flows based on cron expressions
- Modified
AdapterStoreto support fire-and-forget flow execution via a newwait_for_resultparameter - Updated project status to reflect cron-jobs as under construction
Reviewed changes
Copilot reviewed 6 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| adapter/cron/src/main.rs | New cron adapter implementation with scheduling logic and flow identification |
| adapter/cron/Cargo.toml | Cargo configuration for the cron adapter package |
| crates/base/src/store.rs | Added wait_for_result parameter to support both request-response and publish patterns; fixed typo in documentation |
| adapter/rest/src/main.rs | Updated to pass true for wait_for_result parameter to maintain existing request-response behavior |
| Cargo.toml | Added cron adapter to workspace members and cron/chrono dependencies |
| Cargo.lock | Lock file updates for new dependencies (chrono, cron, and transitive dependencies) |
| README.md | Updated roadmap to show Cron-Jobs as under construction and removed AMQP entry |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ctx.adapter_store | ||
| .validate_and_execute_flow(flow, None, false) | ||
| .await; |
There was a problem hiding this comment.
[nitpick] When multiple cron flows need to execute, they are executed sequentially with await. If any flow execution takes a long time, it will delay subsequent flows. Consider spawning tasks concurrently using tokio::spawn to avoid blocking other scheduled flows from executing on time.
| ctx.adapter_store | |
| .validate_and_execute_flow(flow, None, false) | |
| .await; | |
| let adapter_store = ctx.adapter_store.clone(); | |
| tokio::spawn(async move { | |
| adapter_store | |
| .validate_and_execute_flow(flow, None, false) | |
| .await; | |
| }); |
| let now = Utc::now(); | ||
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | ||
| let until_next = next - now; | ||
| tokio::time::sleep(until_next.to_std()?).await; | ||
|
|
There was a problem hiding this comment.
The Time struct uses now which is captured before sleeping, but the sleep happens before this struct is created. This means the time used for flow identification will be stale. The now should be captured after the sleep completes, not before.
Currently:
let now = Utc::now();
// ... sleep happens ...
let time = Time { now }; // uses stale timeShould be:
// ... sleep happens ...
let now = Utc::now(); // capture current time after sleep
let time = Time { now };| let now = Utc::now(); | |
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | |
| let until_next = next - now; | |
| tokio::time::sleep(until_next.to_std()?).await; | |
| if let Some(next) = schedule.upcoming(Utc).take(1).next() { | |
| let now = Utc::now(); | |
| let until_next = next - now; | |
| tokio::time::sleep(until_next.to_std()?).await; | |
| let now = Utc::now(); |
| let schedule = Schedule::from_str(expression.as_str()).unwrap(); | ||
| let next = schedule.upcoming(Utc).next().unwrap(); |
There was a problem hiding this comment.
Using unwrap() on Schedule::from_str() and upcoming().next() can cause panics if the cron expression is invalid or if there are no upcoming times. Consider handling these errors gracefully with proper error messages, especially since the cron expression comes from user-provided flow settings.
| let schedule = Schedule::from_str(expression.as_str()).unwrap(); | |
| let next = schedule.upcoming(Utc).next().unwrap(); | |
| let schedule = match Schedule::from_str(expression.as_str()) { | |
| Ok(s) => s, | |
| Err(_) => { | |
| // Invalid cron expression, cannot identify | |
| return false; | |
| } | |
| }; | |
| let next = match schedule.upcoming(Utc).next() { | |
| Some(n) => n, | |
| None => { | |
| // No upcoming times, cannot identify | |
| return false; | |
| } | |
| }; |
| return false; | ||
| }; | ||
|
|
||
| let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); |
There was a problem hiding this comment.
The cron expression format is incorrect. The code extracts fields named "minute", "hour", "day_of_month", "month", and "day_of_week", then creates a 6-field expression with a leading asterisk. In the standard 6-field cron format (with seconds), the order is: second minute hour day_of_month month day_of_week.
The expression should be: "0 {} {} {} {} {}" (setting seconds to 0), not "* {} {} {} {} {}" (which would allow any second). Alternatively, if only 5 fields are intended, remove the leading asterisk entirely and use: "{} {} {} {} {}".
The current format "* minute hour dom month dow" creates an ambiguous expression where the asterisk is in the seconds position but the subsequent fields are shifted incorrectly.
| let expression = format!("* {} {} {} {} {}", minute, hour, dom, month, dow); | |
| let expression = format!("0 {} {} {} {} {}", minute, hour, dom, month, dow); |
| pub client: async_nats::Client, | ||
| pub kv: async_nats::jetstream::kv::Store, |
There was a problem hiding this comment.
[nitpick] Making the client and kv fields public exposes internal implementation details. Consider if these need to be public or if access should be provided through dedicated methods to maintain better encapsulation.
| pub client: async_nats::Client, | |
| pub kv: async_nats::jetstream::kv::Store, | |
| client: async_nats::Client, | |
| kv: async_nats::jetstream::kv::Store, |
| match self.client.publish(topic, bytes.into()).await { | ||
| Ok(_) => None, | ||
| Err(err) => { | ||
| log::error!("Failed to send request to NATS server: {:?}", err); |
There was a problem hiding this comment.
The error message "Failed to send request to NATS server" is misleading when using publish instead of request. When wait_for_result is false, the operation is a publish (fire-and-forget), not a request-response. Consider using a more accurate message like "Failed to publish to NATS server".
| log::error!("Failed to send request to NATS server: {:?}", err); | |
| log::error!("Failed to publish to NATS server: {:?}", err); |
| &self, | ||
| flow: ValidationFlow, | ||
| input_value: Option<Value>, | ||
| wait_for_result: bool, |
There was a problem hiding this comment.
The wait_for_result parameter is not documented in the function's documentation comment. Please add documentation explaining when this should be true vs false and what the implications are for the return value.
Resolves: #80