Skip to content

Commit

Permalink
Unify duration parsing in admin api and ingress
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper committed May 22, 2024
1 parent 58479ca commit 6102c4e
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 14 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/admin/src/rest_api/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ pub async fn modify_service<V>(
}
if let Some(new_idempotency_retention) = idempotency_retention {
modify_request.push(ModifyServiceChange::IdempotencyRetention(
new_idempotency_retention.into(),
new_idempotency_retention,
));
}
if let Some(new_workflow_completion_retention) = workflow_completion_retention {
modify_request.push(ModifyServiceChange::WorkflowCompletionRetention(
new_workflow_completion_retention.into(),
new_workflow_completion_retention,
));
}

Expand Down
2 changes: 1 addition & 1 deletion crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ options_schema = ["dep:schemars"]
restate-core = { workspace = true }
restate-errors = { workspace = true }
restate-ingress-dispatcher = { workspace = true }
restate-serde-util = { workspace = true }
restate-schema-api = { workspace = true, features = ["service", "invocation_target"]}
restate-types = { workspace = true }
restate-service-protocol = { workspace = true, features = [ "awakeable-id" ] }
Expand Down Expand Up @@ -60,7 +61,6 @@ schemars = { workspace = true, optional = true }
thiserror = { workspace = true }
urlencoding = "2.1"
pin-project-lite = "0.2.13"
iso8601 = "0.6.1"
humantime = { workspace = true }

[dev-dependencies]
Expand Down
22 changes: 18 additions & 4 deletions crates/ingress-http/src/handler/service_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use restate_types::invocation::{
Header, InvocationTarget, InvocationTargetType, ServiceInvocation, Source, SpanRelation,
WorkflowHandlerType,
};
use serde::Serialize;
use serde::de::IntoDeserializer;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use std::time::{Duration, Instant, SystemTime};
use tracing::{info, trace, warn, Instrument};

Expand Down Expand Up @@ -315,6 +317,16 @@ fn parse_headers(headers: HeaderMap) -> Result<Vec<Header>, HandlerError> {
.collect()
}

#[serde_as]
#[derive(Deserialize)]
#[serde(transparent)]
struct DurationQueryParam(
#[serde_as(
as = "serde_with::PickFirst<(restate_serde_util::DurationString, serde_with::DurationMilliSeconds)>"
)]
Duration,
);

fn parse_delay(query: Option<&str>) -> Result<Option<Duration>, HandlerError> {
if query.is_none() {
return Ok(None);
Expand All @@ -323,9 +335,11 @@ fn parse_delay(query: Option<&str>) -> Result<Option<Duration>, HandlerError> {
for (k, v) in url::form_urlencoded::parse(query.unwrap().as_bytes()) {
if k.eq_ignore_ascii_case(DELAY_QUERY_PARAM) {
return Ok(Some(
iso8601::duration(v.as_ref())
.map_err(HandlerError::BadDelayDuration)?
.into(),
DurationQueryParam::deserialize(v.as_ref().into_deserializer())
.map_err(|e: serde::de::value::Error| {
HandlerError::BadDelayDuration(e.to_string())
})?
.0,
));
}
if k.eq_ignore_ascii_case(DELAYSEC_QUERY_PARAM) {
Expand Down
19 changes: 13 additions & 6 deletions crates/meta-rest-model/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::Duration;

// Export schema types to be used by other crates without exposing the fact
// that we are using proxying to restate-schema-api or restate-types
Expand Down Expand Up @@ -38,19 +39,25 @@ pub struct ModifyServiceRequest {
///
/// Modify the retention of idempotent requests for this service.
///
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.
#[serde(default, with = "serde_with::As::<Option<serde_with::DisplayFromStr>>")]
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format or the ISO8601.
#[serde(
default,
with = "serde_with::As::<Option<restate_serde_util::DurationString>>"
)]
#[cfg_attr(feature = "schema", schemars(with = "Option<String>"))]
pub idempotency_retention: Option<humantime::Duration>,
pub idempotency_retention: Option<Duration>,

/// # Workflow completion retention
///
/// Modify the retention of the workflow completion. This can be modified only for workflow services!
///
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format.
#[serde(default, with = "serde_with::As::<Option<serde_with::DisplayFromStr>>")]
/// Can be configured using the [`humantime`](https://docs.rs/humantime/latest/humantime/fn.parse_duration.html) format or the ISO8601.
#[serde(
default,
with = "serde_with::As::<Option<restate_serde_util::DurationString>>"
)]
#[cfg_attr(feature = "schema", schemars(with = "Option<String>"))]
pub workflow_completion_retention: Option<humantime::Duration>,
pub workflow_completion_retention: Option<Duration>,
}

#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))]
Expand Down
2 changes: 2 additions & 0 deletions crates/serde-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ proto = ["dep:prost", "dep:bytes"]
bytes = { workspace = true, optional = true }
bytesize = { version = "1.3.0" }
http = { workspace = true }
humantime = { workspace = true }
iso8601 = "0.6.1"
prost = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
107 changes: 107 additions & 0 deletions crates/serde-util/src/duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use serde::de::Error;
use serde::{Deserialize, Deserializer, Serializer};
use serde_with::{DeserializeAs, SerializeAs};

/// Serializable/Deserializable duration to use with serde_with.
///
/// When serializing the humantime format is used.
///
/// When deserializing, the following formats are accepted:
///
/// * ISO8601 durations
/// * Humantime durations
pub struct DurationString;

impl<'de> DeserializeAs<'de, std::time::Duration> for DurationString {
fn deserialize_as<D>(deserializer: D) -> Result<std::time::Duration, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
if s.starts_with('P') {
Ok(iso8601::duration(&s).map_err(Error::custom)?.into())
} else {
humantime::parse_duration(&s).map_err(Error::custom)
}
}
}

impl SerializeAs<std::time::Duration> for DurationString {
fn serialize_as<S>(source: &std::time::Duration, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.collect_str(&humantime::Duration::from(*source))
}
}

#[cfg(test)]
mod tests {
use super::*;

use serde::{Deserialize, Serialize};

use serde_with::serde_as;

#[serde_as]
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
struct MyDuration(#[serde_as(as = "DurationString")] std::time::Duration);

#[test]
fn serialize_humantime() {
let d = std::time::Duration::from_secs(60 * 23);

let result_string =
serde_json::from_str::<String>(&serde_json::to_string(&MyDuration(d)).unwrap())
.unwrap();

assert_eq!(result_string, humantime::Duration::from(d).to_string());
}

#[test]
fn deserialize_iso8601() {
let d = std::time::Duration::from_secs(10);

assert_eq!(
serde_json::from_value::<MyDuration>(serde_json::Value::String("PT10S".to_owned()))
.unwrap()
.0,
d
);
}

#[test]
fn deserialize_humantime() {
let d = std::time::Duration::from_secs(60 * 23);

assert_eq!(
serde_json::from_value::<MyDuration>(serde_json::Value::String(
humantime::Duration::from(d).to_string()
))
.unwrap()
.0,
d
);
}
}
2 changes: 2 additions & 0 deletions crates/serde-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ mod header_map;
mod proto;

pub mod default;
mod duration;
pub mod header_value;

pub use byte_count::*;
pub use duration::DurationString;
pub use header_map::SerdeableHeaderHashMap;
pub use header_value::HeaderValueSerde;
#[cfg(feature = "proto")]
Expand Down

0 comments on commit 6102c4e

Please sign in to comment.