diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 32dbd016c..c7eac74e3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,6 +117,24 @@ jobs: RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug testArtifactOutput: sdk-java-kafka-next-gen-integration-test-report + sdk-java-invocation-status-killed: + name: Run SDK-Java integration tests with InvocationStatusKilled + permissions: + contents: read + issues: read + checks: write + pull-requests: write + actions: read + secrets: inherit + needs: docker + uses: restatedev/sdk-java/.github/workflows/integration.yaml@main + with: + restateCommit: ${{ github.event.pull_request.head.sha || github.sha }} + envVars: | + RESTATE_WORKER__EXPERIMENTAL_FEATURE_INVOCATION_STATUS_KILLED=true + RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug + testArtifactOutput: sdk-java-invocation-status-killed-integration-test-report + sdk-python: name: Run SDK-Python integration tests permissions: diff --git a/Cargo.lock b/Cargo.lock index c87af8888..4242a86d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6994,6 +6994,7 @@ dependencies = [ "codederror", "derive_builder", "derive_more", + "enumset", "futures", "googletest", "humantime", diff --git a/cli/src/clients/datafusion_helpers.rs b/cli/src/clients/datafusion_helpers.rs index f7b781bf0..b72013e0e 100644 --- a/cli/src/clients/datafusion_helpers.rs +++ b/cli/src/clients/datafusion_helpers.rs @@ -128,6 +128,7 @@ pub enum InvocationState { Running, Suspended, BackingOff, + Killed, Completed, } @@ -142,6 +143,7 @@ impl FromStr for InvocationState { "suspended" => Self::Suspended, "backing-off" => Self::BackingOff, "completed" => Self::Completed, + "killed" => Self::Killed, _ => Self::Unknown, }) } @@ -157,6 +159,7 @@ impl Display for InvocationState { InvocationState::Running => write!(f, "running"), InvocationState::Suspended => write!(f, "suspended"), InvocationState::BackingOff => write!(f, "backing-off"), + InvocationState::Killed => write!(f, "killed"), InvocationState::Completed => write!(f, "completed"), } } diff --git a/cli/src/ui/invocations.rs b/cli/src/ui/invocations.rs index d1af069b5..0ba03447e 100644 --- a/cli/src/ui/invocations.rs +++ b/cli/src/ui/invocations.rs @@ -113,6 +113,7 @@ pub fn invocation_status_style(status: InvocationState) -> Style { InvocationState::Suspended => DStyle::new().dim(), InvocationState::BackingOff => DStyle::new().red(), InvocationState::Completed => DStyle::new().blue(), + InvocationState::Killed => DStyle::new().red(), } } diff --git a/crates/invoker-api/src/handle.rs b/crates/invoker-api/src/handle.rs index 747efda88..385053cbf 100644 --- a/crates/invoker-api/src/handle.rs +++ b/crates/invoker-api/src/handle.rs @@ -60,6 +60,8 @@ pub trait InvokerHandle { &mut self, partition_leader_epoch: PartitionLeaderEpoch, invocation_id: InvocationId, + // If true, acknowledge the abort. This will generate a Failed effect + acknowledge: bool, ) -> impl Future> + Send; fn register_partition( diff --git a/crates/invoker-api/src/lib.rs b/crates/invoker-api/src/lib.rs index 63845f5f7..d0d554dc1 100644 --- a/crates/invoker-api/src/lib.rs +++ b/crates/invoker-api/src/lib.rs @@ -128,6 +128,7 @@ pub mod test_util { &mut self, _partition_leader_epoch: PartitionLeaderEpoch, _invocation_id: InvocationId, + _acknowledge: bool, ) -> Result<(), NotRunningError> { Ok(()) } diff --git a/crates/invoker-impl/src/input_command.rs b/crates/invoker-impl/src/input_command.rs index 2eabaef42..d717f2c50 100644 --- a/crates/invoker-impl/src/input_command.rs +++ b/crates/invoker-impl/src/input_command.rs @@ -45,6 +45,7 @@ pub(crate) enum InputCommand { Abort { partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, }, /// Command used to clean up internal state when a partition leader is going away @@ -129,11 +130,13 @@ impl restate_invoker_api::InvokerHandle for InvokerHandle { &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, ) -> Result<(), NotRunningError> { self.input .send(InputCommand::Abort { partition, invocation_id, + acknowledge, }) .map_err(|_| NotRunningError) } diff --git a/crates/invoker-impl/src/lib.rs b/crates/invoker-impl/src/lib.rs index 933ed28ea..6f773c92d 100644 --- a/crates/invoker-impl/src/lib.rs +++ b/crates/invoker-impl/src/lib.rs @@ -61,6 +61,7 @@ pub use input_command::ChannelStatusReader; pub use input_command::InvokerHandle; use restate_service_client::{AssumeRoleCacheMode, ServiceClient}; use restate_types::deployment::PinnedDeployment; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::invocation::InvocationTarget; use restate_types::schema::service::ServiceMetadataResolver; @@ -351,8 +352,8 @@ where self.handle_register_partition(partition, partition_key_range, storage_reader, sender); }, - InputCommand::Abort { partition, invocation_id } => { - self.handle_abort_invocation(partition, invocation_id); + InputCommand::Abort { partition, invocation_id, acknowledge } => { + self.handle_abort_invocation(partition, invocation_id, acknowledge).await; } InputCommand::AbortAllPartition { partition } => { self.handle_abort_partition(partition); @@ -808,12 +809,13 @@ where restate.invoker.partition_leader_epoch = ?partition, ) )] - fn handle_abort_invocation( + async fn handle_abort_invocation( &mut self, partition: PartitionLeaderEpoch, invocation_id: InvocationId, + acknowledge: bool, ) { - if let Some((_, _, mut ism)) = self + if let Some((tx, _, mut ism)) = self .invocation_state_machine_manager .remove_invocation(partition, &invocation_id) { @@ -823,6 +825,14 @@ where ism.abort(); self.quota.unreserve_slot(); self.status_store.on_end(&partition, &invocation_id); + if acknowledge { + let _ = tx + .send(Effect { + invocation_id, + kind: EffectKind::Failed(KILLED_INVOCATION_ERROR), + }) + .await; + } } else { trace!("Ignoring Abort command because there is no matching partition/invocation"); } @@ -1415,7 +1425,9 @@ mod tests { assert_eq!(*available_slots, 1); // Abort the invocation - service_inner.handle_abort_invocation(MOCK_PARTITION, invocation_id); + service_inner + .handle_abort_invocation(MOCK_PARTITION, invocation_id, false) + .await; // Check the quota let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota); diff --git a/crates/partition-store/src/invocation_status_table/mod.rs b/crates/partition-store/src/invocation_status_table/mod.rs index 992c4fe14..6b789531a 100644 --- a/crates/partition-store/src/invocation_status_table/mod.rs +++ b/crates/partition-store/src/invocation_status_table/mod.rs @@ -17,11 +17,11 @@ use futures::Stream; use futures_util::stream; use restate_rocksdb::RocksDbPerfGuard; use restate_storage_api::invocation_status_table::{ - InvocationStatus, InvocationStatusTable, InvocationStatusV1, ReadOnlyInvocationStatusTable, + InvocationLite, InvocationStatus, InvocationStatusDiscriminants, InvocationStatusTable, + InvocationStatusV1, InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, }; use restate_storage_api::{Result, StorageError}; use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey}; -use restate_types::invocation::InvocationTarget; use restate_types::storage::StorageCodec; use std::ops::RangeInclusive; use tracing::trace; @@ -169,7 +169,7 @@ fn delete_invocation_status(storage: &mut S, invocation_id: &I fn invoked_invocations( storage: &mut S, partition_key_range: RangeInclusive, -) -> Vec> { +) -> Vec> { let _x = RocksDbPerfGuard::new("invoked-invocations"); let mut invocations = storage.for_each_key_value_in_place( FullScanPartitionKeyRange::(partition_key_range.clone()), @@ -239,12 +239,16 @@ fn all_invocation_status( fn read_invoked_v1_full_invocation_id( mut k: &mut &[u8], v: &mut &[u8], -) -> Result> { +) -> Result> { let invocation_id = invocation_id_from_v1_key_bytes(&mut k)?; let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; if let InvocationStatus::Invoked(invocation_meta) = invocation_status.0 { - Ok(Some((invocation_id, invocation_meta.invocation_target))) + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_meta.invocation_target, + is_invoked: true, + })) } else { Ok(None) } @@ -253,13 +257,22 @@ fn read_invoked_v1_full_invocation_id( fn read_invoked_full_invocation_id( mut k: &mut &[u8], v: &mut &[u8], -) -> Result> { - // TODO this can be improved by simply parsing InvocationTarget and the Status enum +) -> Result> { let invocation_id = invocation_id_from_key_bytes(&mut k)?; - let invocation_status = StorageCodec::decode::(v) + let invocation_status = StorageCodec::decode::(v) .map_err(|err| StorageError::Generic(err.into()))?; - if let InvocationStatus::Invoked(invocation_meta) = invocation_status { - Ok(Some((invocation_id, invocation_meta.invocation_target))) + if let InvocationStatusDiscriminants::Invoked = invocation_status.status { + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_status.invocation_target, + is_invoked: true, + })) + } else if let InvocationStatusDiscriminants::Killed = invocation_status.status { + Ok(Some(InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target: invocation_status.invocation_target, + is_invoked: false, + })) } else { Ok(None) } @@ -274,9 +287,9 @@ impl ReadOnlyInvocationStatusTable for PartitionStore { get_invocation_status(self, invocation_id) } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { stream::iter(invoked_invocations( self, self.partition_key_range().clone(), @@ -300,9 +313,9 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> { try_migrate_and_get_invocation_status(self, invocation_id) } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send { + ) -> impl Stream> + Send { stream::iter(invoked_invocations( self, self.partition_key_range().clone(), diff --git a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs index ed662ae9d..442a9ce3b 100644 --- a/crates/partition-store/src/tests/invocation_status_table_test/mod.rs +++ b/crates/partition-store/src/tests/invocation_status_table_test/mod.rs @@ -22,7 +22,8 @@ use googletest::prelude::*; use once_cell::sync::Lazy; use restate_storage_api::invocation_status_table::{ InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, InvocationStatusV1, - JournalMetadata, ReadOnlyInvocationStatusTable, StatusTimestamps, + InvokedOrKilledInvocationStatusLite, JournalMetadata, ReadOnlyInvocationStatusTable, + StatusTimestamps, }; use restate_storage_api::Transaction; use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey}; @@ -94,6 +95,19 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus { }) } +fn killed_status(invocation_target: InvocationTarget) -> InvocationStatus { + InvocationStatus::Killed(InFlightInvocationMetadata { + invocation_target, + journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()), + pinned_deployment: None, + response_sinks: HashSet::new(), + timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)), + source: Source::Ingress(*RPC_REQUEST_ID), + completion_retention_duration: Duration::ZERO, + idempotency_key: None, + }) +} + fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus { InvocationStatus::Suspended { metadata: InFlightInvocationMetadata { @@ -131,7 +145,7 @@ async fn populate_data(txn: &mut T) { txn.put_invocation_status( &INVOCATION_ID_4, - &invoked_status(INVOCATION_TARGET_4.clone()), + &killed_status(INVOCATION_TARGET_4.clone()), ) .await; @@ -154,22 +168,34 @@ async fn verify_point_lookups(txn: &mut T) { txn.get_invocation_status(&INVOCATION_ID_4) .await .expect("should not fail"), - invoked_status(INVOCATION_TARGET_4.clone()) + killed_status(INVOCATION_TARGET_4.clone()) ); } async fn verify_all_svc_with_status_invoked(txn: &mut T) { let actual = txn - .all_invoked_invocations() + .all_invoked_or_killed_invocations() .try_collect::>() .await .unwrap(); assert_that!( actual, unordered_elements_are![ - eq((*INVOCATION_ID_1, INVOCATION_TARGET_1.clone())), - eq((*INVOCATION_ID_2, INVOCATION_TARGET_2.clone())), - eq((*INVOCATION_ID_4, INVOCATION_TARGET_4.clone())) + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_1, + invocation_target: INVOCATION_TARGET_1.clone(), + is_invoked: true, + }), + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_2, + invocation_target: INVOCATION_TARGET_2.clone(), + is_invoked: true, + }), + eq(InvokedOrKilledInvocationStatusLite { + invocation_id: *INVOCATION_ID_4, + invocation_target: INVOCATION_TARGET_4.clone(), + is_invoked: false, + }), ] ); } diff --git a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto index 7d793769f..389b5cb6d 100644 --- a/crates/storage-api/proto/dev/restate/storage/v1/domain.proto +++ b/crates/storage-api/proto/dev/restate/storage/v1/domain.proto @@ -119,6 +119,7 @@ message InvocationStatusV2 { INBOXED = 2; INVOKED = 3; SUSPENDED = 4; + KILLED = 6; COMPLETED = 5; } @@ -160,6 +161,12 @@ message InvocationStatusV2 { ResponseResult result = 18; } +// Slimmer version of InvocationStatusV2 +message InvocationV2Lite { + InvocationStatusV2.Status status = 1; + InvocationTarget invocation_target = 2; +} + // TODO remove this after 1.1 message InvocationStatus { diff --git a/crates/storage-api/src/invocation_status_table/mod.rs b/crates/storage-api/src/invocation_status_table/mod.rs index c052e900c..585e4e635 100644 --- a/crates/storage-api/src/invocation_status_table/mod.rs +++ b/crates/storage-api/src/invocation_status_table/mod.rs @@ -189,6 +189,7 @@ pub enum InvocationStatus { metadata: InFlightInvocationMetadata, waiting_for_completed_entries: HashSet, }, + Killed(InFlightInvocationMetadata), Completed(CompletedInvocation), /// Service instance is currently not invoked #[default] @@ -203,6 +204,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.invocation_target), InvocationStatus::Invoked(metadata) => Some(&metadata.invocation_target), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.invocation_target), + InvocationStatus::Killed(metadata) => Some(&metadata.invocation_target), InvocationStatus::Completed(completed) => Some(&completed.invocation_target), _ => None, } @@ -215,6 +217,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.source), InvocationStatus::Invoked(metadata) => Some(&metadata.source), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.source), + InvocationStatus::Killed(metadata) => Some(&metadata.source), InvocationStatus::Completed(completed) => Some(&completed.source), _ => None, } @@ -227,6 +230,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => metadata.metadata.idempotency_key.as_ref(), InvocationStatus::Invoked(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Suspended { metadata, .. } => metadata.idempotency_key.as_ref(), + InvocationStatus::Killed(metadata) => metadata.idempotency_key.as_ref(), InvocationStatus::Completed(completed) => completed.idempotency_key.as_ref(), _ => None, } @@ -237,6 +241,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(metadata.journal_metadata), _ => None, } } @@ -246,6 +251,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(&metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(&metadata.journal_metadata), _ => None, } } @@ -255,6 +261,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(&mut metadata.journal_metadata), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.journal_metadata), + InvocationStatus::Killed(metadata) => Some(&mut metadata.journal_metadata), _ => None, } } @@ -264,6 +271,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -273,6 +281,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -282,6 +291,7 @@ impl InvocationStatus { match self { InvocationStatus::Invoked(metadata) => Some(metadata), InvocationStatus::Suspended { metadata, .. } => Some(metadata), + InvocationStatus::Killed(metadata) => Some(metadata), _ => None, } } @@ -295,6 +305,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&mut metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.response_sinks), + InvocationStatus::Killed(metadata) => Some(&mut metadata.response_sinks), _ => None, } } @@ -306,6 +317,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.response_sinks), InvocationStatus::Invoked(metadata) => Some(&metadata.response_sinks), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.response_sinks), + InvocationStatus::Killed(metadata) => Some(&metadata.response_sinks), _ => None, } } @@ -317,6 +329,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&metadata.timestamps), + InvocationStatus::Killed(metadata) => Some(&metadata.timestamps), InvocationStatus::Completed(completed) => Some(&completed.timestamps), _ => None, } @@ -329,6 +342,7 @@ impl InvocationStatus { InvocationStatus::Inboxed(metadata) => Some(&mut metadata.metadata.timestamps), InvocationStatus::Invoked(metadata) => Some(&mut metadata.timestamps), InvocationStatus::Suspended { metadata, .. } => Some(&mut metadata.timestamps), + InvocationStatus::Killed(metadata) => Some(&mut metadata.timestamps), InvocationStatus::Completed(completed) => Some(&mut completed.timestamps), _ => None, } @@ -337,6 +351,25 @@ impl InvocationStatus { protobuf_storage_encode_decode!(InvocationStatus, crate::storage::v1::InvocationStatusV2); +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum InvocationStatusDiscriminants { + Scheduled, + Inboxed, + Invoked, + Suspended, + Killed, + Completed, +} + +/// Lite status of an invocation. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct InvocationLite { + pub status: InvocationStatusDiscriminants, + pub invocation_target: InvocationTarget, +} + +protobuf_storage_encode_decode!(InvocationLite, crate::storage::v1::InvocationV2Lite); + /// Wrapper used by the table implementation only for the migration, don't use it! #[derive(Debug, Default, Clone, PartialEq)] pub struct InvocationStatusV1(pub InvocationStatus); @@ -550,15 +583,23 @@ impl CompletedInvocation { } } +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct InvokedOrKilledInvocationStatusLite { + pub invocation_id: InvocationId, + pub invocation_target: InvocationTarget, + /// If true, original status is Invoked, otherwise is Killed + pub is_invoked: bool, +} + pub trait ReadOnlyInvocationStatusTable { fn get_invocation_status( &mut self, invocation_id: &InvocationId, ) -> impl Future> + Send; - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send; + ) -> impl Stream> + Send; fn all_invocation_statuses( &self, diff --git a/crates/storage-api/src/storage.rs b/crates/storage-api/src/storage.rs index e255824b1..8c3c4c1b6 100644 --- a/crates/storage-api/src/storage.rs +++ b/crates/storage-api/src/storage.rs @@ -103,10 +103,11 @@ pub mod v1 { virtual_object_status, BackgroundCallResolutionResult, DedupSequenceNumber, Duration, EnrichedEntryHeader, EntryResult, EpochSequenceNumber, Header, IdempotencyId, IdempotencyMetadata, InboxEntry, InvocationId, InvocationResolutionResult, - InvocationStatus, InvocationStatusV2, InvocationTarget, JournalEntry, JournalEntryId, - JournalMeta, KvPair, OutboxMessage, Promise, ResponseResult, SequenceNumber, ServiceId, - ServiceInvocation, ServiceInvocationResponseSink, Source, SpanContext, SpanRelation, - StateMutation, SubmitNotificationSink, Timer, VirtualObjectStatus, + InvocationStatus, InvocationStatusV2, InvocationTarget, InvocationV2Lite, JournalEntry, + JournalEntryId, JournalMeta, KvPair, OutboxMessage, Promise, ResponseResult, + SequenceNumber, ServiceId, ServiceInvocation, ServiceInvocationResponseSink, Source, + SpanContext, SpanRelation, StateMutation, SubmitNotificationSink, Timer, + VirtualObjectStatus, }; use crate::StorageError; use restate_types::errors::{IdDecodeError, InvocationError}; @@ -471,6 +472,28 @@ pub mod v1 { .collect(), }, ), + invocation_status_v2::Status::Killed => { + Ok(crate::invocation_status_table::InvocationStatus::Killed( + crate::invocation_status_table::InFlightInvocationMetadata { + response_sinks, + timestamps, + invocation_target, + journal_metadata: crate::invocation_status_table::JournalMetadata { + length: journal_length, + span_context: expect_or_fail!(span_context)?.try_into()?, + }, + pinned_deployment: derive_pinned_deployment( + deployment_id, + service_protocol_version, + )?, + source, + completion_retention_duration: completion_retention_duration + .unwrap_or_default() + .try_into()?, + idempotency_key: idempotency_key.map(ByteString::from), + }, + )) + } invocation_status_v2::Status::Completed => { Ok(crate::invocation_status_table::InvocationStatus::Completed( crate::invocation_status_table::CompletedInvocation { @@ -728,6 +751,68 @@ pub mod v1 { result: None, } } + crate::invocation_status_table::InvocationStatus::Killed( + crate::invocation_status_table::InFlightInvocationMetadata { + invocation_target, + journal_metadata, + pinned_deployment, + response_sinks, + timestamps, + source, + completion_retention_duration, + idempotency_key, + }, + ) => { + let (deployment_id, service_protocol_version) = match pinned_deployment { + None => (None, None), + Some(pinned_deployment) => ( + Some(pinned_deployment.deployment_id.to_string()), + Some(pinned_deployment.service_protocol_version.as_repr()), + ), + }; + + InvocationStatusV2 { + status: invocation_status_v2::Status::Killed.into(), + invocation_target: Some(invocation_target.into()), + source: Some(source.into()), + span_context: Some(journal_metadata.span_context.into()), + creation_time: unsafe { timestamps.creation_time() }.as_u64(), + modification_time: unsafe { timestamps.modification_time() }.as_u64(), + inboxed_transition_time: unsafe { + timestamps.inboxed_transition_time() + } + .map(|t| t.as_u64()), + scheduled_transition_time: unsafe { + timestamps.scheduled_transition_time() + } + .map(|t| t.as_u64()), + running_transition_time: unsafe { + timestamps.running_transition_time() + } + .map(|t| t.as_u64()), + completed_transition_time: unsafe { + timestamps.completed_transition_time() + } + .map(|t| t.as_u64()), + response_sinks: response_sinks + .into_iter() + .map(|s| ServiceInvocationResponseSink::from(Some(s))) + .collect(), + argument: None, + headers: vec![], + execution_time: None, + completion_retention_duration: Some( + completion_retention_duration.into(), + ), + idempotency_key: idempotency_key.map(|key| key.to_string()), + inbox_sequence_number: None, + journal_length: journal_metadata.length, + deployment_id, + service_protocol_version, + waiting_for_completed_entries: vec![], + result: None, + } + } crate::invocation_status_table::InvocationStatus::Completed( crate::invocation_status_table::CompletedInvocation { invocation_target, @@ -777,6 +862,56 @@ pub mod v1 { } } + impl TryFrom for crate::invocation_status_table::InvocationLite { + type Error = ConversionError; + + fn try_from(value: InvocationV2Lite) -> Result { + let InvocationV2Lite { + status, + invocation_target, + } = value; + + let invocation_target = expect_or_fail!(invocation_target)?.try_into()?; + let status = match status.try_into().unwrap_or_default() { + invocation_status_v2::Status::Scheduled => { + crate::invocation_status_table::InvocationStatusDiscriminants::Scheduled + } + invocation_status_v2::Status::Inboxed => { + crate::invocation_status_table::InvocationStatusDiscriminants::Inboxed + } + invocation_status_v2::Status::Invoked => { + crate::invocation_status_table::InvocationStatusDiscriminants::Invoked + } + invocation_status_v2::Status::Suspended => { + crate::invocation_status_table::InvocationStatusDiscriminants::Suspended + } + invocation_status_v2::Status::Killed => { + crate::invocation_status_table::InvocationStatusDiscriminants::Killed + } + invocation_status_v2::Status::Completed => { + crate::invocation_status_table::InvocationStatusDiscriminants::Completed + } + _ => { + return Err(ConversionError::unexpected_enum_variant( + "status", + value.status, + )) + } + }; + + Ok((crate::invocation_status_table::InvocationLite { + status, + invocation_target, + })) + } + } + + impl From for InvocationV2Lite { + fn from(_: crate::invocation_status_table::InvocationLite) -> Self { + panic!("Unexpected usage of InvocationLite, this data structure can be used only for reading, and never for writing") + } + } + impl TryFrom for crate::invocation_status_table::InvocationStatusV1 { type Error = ConversionError; @@ -856,6 +991,9 @@ pub mod v1 { crate::invocation_status_table::InvocationStatus::Scheduled(_) => { panic!("Unexpected conversion to old InvocationStatus when using Scheduled variant. This is a bug in the table implementation.") } + crate::invocation_status_table::InvocationStatus::Killed(_) => { + panic!("Unexpected conversion to old InvocationStatus when using Killed variant. This is a bug in the table implementation.") + } }; InvocationStatus { @@ -966,6 +1104,7 @@ pub mod v1 { source, completion_retention_duration: completion_retention_time, idempotency_key, + .. } = value; let (deployment_id, service_protocol_version) = match pinned_deployment { diff --git a/crates/storage-api/src/timer_table/mod.rs b/crates/storage-api/src/timer_table/mod.rs index 67a7484b9..bb8e584c4 100644 --- a/crates/storage-api/src/timer_table/mod.rs +++ b/crates/storage-api/src/timer_table/mod.rs @@ -49,7 +49,7 @@ impl TimerKey { } } - fn neo_invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { + pub fn neo_invoke(timestamp: u64, invocation_uuid: InvocationUuid) -> Self { TimerKey { timestamp, kind: TimerKeyKind::NeoInvoke { invocation_uuid }, diff --git a/crates/storage-query-datafusion/src/invocation_status/row.rs b/crates/storage-query-datafusion/src/invocation_status/row.rs index b61b2c2ff..2d010deea 100644 --- a/crates/storage-query-datafusion/src/invocation_status/row.rs +++ b/crates/storage-query-datafusion/src/invocation_status/row.rs @@ -83,6 +83,10 @@ pub(crate) fn append_invocation_status_row( row.status("suspended"); fill_in_flight_invocation_metadata(&mut row, output, metadata); } + InvocationStatus::Killed(metadata) => { + row.status("killed"); + fill_in_flight_invocation_metadata(&mut row, output, metadata); + } InvocationStatus::Free => { row.status("free"); } diff --git a/crates/storage-query-datafusion/src/invocation_status/schema.rs b/crates/storage-query-datafusion/src/invocation_status/schema.rs index 3391728dc..76780c2f5 100644 --- a/crates/storage-query-datafusion/src/invocation_status/schema.rs +++ b/crates/storage-query-datafusion/src/invocation_status/schema.rs @@ -21,7 +21,7 @@ define_table!(sys_invocation_status( /// [Invocation ID](/operate/invocation#invocation-identifier). id: DataType::LargeUtf8, - /// Either `inboxed` or `scheduled` or `invoked` or `suspended` or `completed` + /// Either `inboxed` or `scheduled` or `invoked` or `suspended` or `killed` or `completed` status: DataType::LargeUtf8, /// If `status = 'completed'`, this contains either `success` or `failure` diff --git a/crates/types/src/config/worker.rs b/crates/types/src/config/worker.rs index 187f1818b..7e0ecada1 100644 --- a/crates/types/src/config/worker.rs +++ b/crates/types/src/config/worker.rs @@ -51,6 +51,9 @@ pub struct WorkerOptions { #[cfg_attr(feature = "schemars", schemars(skip))] experimental_feature_disable_idempotency_table: bool, + #[cfg_attr(feature = "schemars", schemars(skip))] + experimental_feature_invocation_status_killed: bool, + pub storage: StorageOptions, pub invoker: InvokerOptions, @@ -88,6 +91,10 @@ impl WorkerOptions { pub fn experimental_feature_disable_idempotency_table(&self) -> bool { self.experimental_feature_disable_idempotency_table } + + pub fn experimental_feature_invocation_status_killed(&self) -> bool { + self.experimental_feature_invocation_status_killed + } } impl Default for WorkerOptions { @@ -97,6 +104,7 @@ impl Default for WorkerOptions { num_timers_in_memory_limit: None, cleanup_interval: Duration::from_secs(60 * 60).into(), experimental_feature_disable_idempotency_table: false, + experimental_feature_invocation_status_killed: false, storage: StorageOptions::default(), invoker: Default::default(), max_command_batch_size: NonZeroUsize::new(4).expect("Non zero number"), diff --git a/crates/worker/Cargo.toml b/crates/worker/Cargo.toml index 214383ac3..1161043f8 100644 --- a/crates/worker/Cargo.toml +++ b/crates/worker/Cargo.toml @@ -51,6 +51,7 @@ codederror = { workspace = true } derive_builder = { workspace = true } derive_more = { workspace = true } futures = { workspace = true } +enumset = { workspace = true } humantime = { workspace = true } itertools = { workspace = true } metrics = { workspace = true } diff --git a/crates/worker/src/partition/cleaner.rs b/crates/worker/src/partition/cleaner.rs index bc2c8c493..b2169eb27 100644 --- a/crates/worker/src/partition/cleaner.rs +++ b/crates/worker/src/partition/cleaner.rs @@ -173,9 +173,9 @@ mod tests { use restate_core::{TaskKind, TestCoreEnvBuilder}; use restate_storage_api::invocation_status_table::{ CompletedInvocation, InFlightInvocationMetadata, InvocationStatus, + InvokedOrKilledInvocationStatusLite, }; use restate_types::identifiers::{InvocationId, InvocationUuid}; - use restate_types::invocation::InvocationTarget; use restate_types::partition_table::{FindPartition, PartitionTable}; use restate_types::Version; use std::future::Future; @@ -194,9 +194,9 @@ mod tests { std::future::pending() } - fn all_invoked_invocations( + fn all_invoked_or_killed_invocations( &mut self, - ) -> impl Stream> + Send + ) -> impl Stream> + Send { todo!(); #[allow(unreachable_code)] diff --git a/crates/worker/src/partition/leadership.rs b/crates/worker/src/partition/leadership.rs index d8be3056b..d8863c139 100644 --- a/crates/worker/src/partition/leadership.rs +++ b/crates/worker/src/partition/leadership.rs @@ -8,6 +8,10 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. +use futures::future::OptionFuture; +use futures::stream::FuturesUnordered; +use futures::{stream, FutureExt, StreamExt, TryStreamExt}; +use metrics::counter; use std::cmp::Ordering; use std::collections::hash_map::Entry; use std::collections::{HashMap, VecDeque}; @@ -19,11 +23,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; use std::time::{Duration, SystemTime}; - -use futures::future::OptionFuture; -use futures::stream::FuturesUnordered; -use futures::{stream, FutureExt, StreamExt, TryStreamExt}; -use metrics::counter; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tracing::{debug, info, instrument, trace, warn}; @@ -37,10 +36,13 @@ use restate_errors::NotRunningError; use restate_invoker_api::InvokeInputJournal; use restate_partition_store::PartitionStore; use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber}; -use restate_storage_api::invocation_status_table::ReadOnlyInvocationStatusTable; +use restate_storage_api::invocation_status_table::{ + InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable, +}; use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable}; use restate_storage_api::timer_table::{TimerKey, TimerTable}; use restate_timer::TokioClock; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::{ InvocationId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; @@ -63,6 +65,7 @@ use crate::partition::cleaner::Cleaner; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::shuffle::{HintSender, OutboxReaderError, Shuffle, ShuffleMetadata}; use crate::partition::state_machine::Action; +use crate::partition::types::{InvokerEffect, InvokerEffectKind}; use crate::partition::{respond_to_rpc, shuffle}; const BATCH_READY_UP_TO: usize = 10; @@ -94,6 +97,9 @@ pub(crate) enum ActionEffect { AwaitingRpcSelfProposeDone, } +type InvokerStream = + stream::Chain>, ReceiverStream>; + pub(crate) struct LeaderState { leader_epoch: LeaderEpoch, shuffle_hint_tx: HintSender, @@ -107,7 +113,7 @@ pub(crate) struct LeaderState { >, awaiting_rpc_self_propose: FuturesUnordered, - invoker_stream: ReceiverStream, + invoker_stream: InvokerStream, shuffle_stream: ReceiverStream, pending_cleanup_timers_to_schedule: VecDeque<(InvocationId, Duration)>, cleaner_task_id: TaskId, @@ -362,7 +368,7 @@ where async fn become_leader(&mut self, partition_store: &mut PartitionStore) -> Result<(), Error> { if let State::Candidate { leader_epoch, .. } = self.state { - let invoker_rx = Self::resume_invoked_invocations( + let invoker_stream = Self::resume_invoked_invocations( &mut self.invoker_tx, (self.partition_processor_metadata.partition_id, leader_epoch), self.partition_processor_metadata @@ -437,7 +443,7 @@ where self_proposer, awaiting_rpc_actions: Default::default(), awaiting_rpc_self_propose: Default::default(), - invoker_stream: ReceiverStream::new(invoker_rx), + invoker_stream, shuffle_stream: ReceiverStream::new(shuffle_rx), pending_cleanup_timers_to_schedule: Default::default(), }); @@ -454,9 +460,11 @@ where partition_key_range: RangeInclusive, partition_store: &mut PartitionStore, channel_size: usize, - ) -> Result, Error> { + ) -> Result { let (invoker_tx, invoker_rx) = mpsc::channel(channel_size); + let mut killed_invocations_effects = vec![]; + invoker_handle .register_partition( partition_leader_epoch, @@ -468,27 +476,43 @@ where .map_err(Error::Invoker)?; { - let invoked_invocations = partition_store.all_invoked_invocations(); + let invoked_invocations = partition_store.all_invoked_or_killed_invocations(); tokio::pin!(invoked_invocations); let mut count = 0; - while let Some(invocation_id_and_target) = invoked_invocations.next().await { - let (invocation_id, invocation_target) = invocation_id_and_target?; - invoker_handle - .invoke( - partition_leader_epoch, + while let Some(invoked_invocation) = invoked_invocations.next().await { + let InvokedOrKilledInvocationStatusLite { + invocation_id, + invocation_target, + is_invoked, + } = invoked_invocation?; + if is_invoked { + invoker_handle + .invoke( + partition_leader_epoch, + invocation_id, + invocation_target, + InvokeInputJournal::NoCachedJournal, + ) + .await + .map_err(Error::Invoker)?; + } else { + // For killed invocations, there's no need to go through the invoker + // We simply return here the effect as if the invoker produced that. + killed_invocations_effects.push(InvokerEffect { invocation_id, - invocation_target, - InvokeInputJournal::NoCachedJournal, - ) - .await - .map_err(Error::Invoker)?; + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + }); + } count += 1; } debug!("Leader partition resumed {} invocations", count); } - Ok(invoker_rx) + Ok( + futures::stream::iter(killed_invocations_effects) + .chain(ReceiverStream::new(invoker_rx)), + ) } async fn become_follower(&mut self) { @@ -639,8 +663,11 @@ where .notify_completion(partition_leader_epoch, invocation_id, completion) .await .map_err(Error::Invoker)?, - Action::AbortInvocation(invocation_id) => invoker_tx - .abort_invocation(partition_leader_epoch, invocation_id) + Action::AbortInvocation { + invocation_id, + acknowledge, + } => invoker_tx + .abort_invocation(partition_leader_epoch, invocation_id, acknowledge) .await .map_err(Error::Invoker)?, Action::IngressResponse { diff --git a/crates/worker/src/partition/mod.rs b/crates/worker/src/partition/mod.rs index 9e7e00099..dbcbfb684 100644 --- a/crates/worker/src/partition/mod.rs +++ b/crates/worker/src/partition/mod.rs @@ -15,6 +15,7 @@ use std::time::{Duration, Instant}; use anyhow::Context; use assert2::let_assert; +use enumset::EnumSet; use futures::{FutureExt, Stream, StreamExt, TryStreamExt as _}; use metrics::{counter, histogram}; use tokio::sync::{mpsc, watch}; @@ -41,6 +42,7 @@ use restate_storage_api::service_status_table::{ use restate_storage_api::{StorageError, Transaction}; use restate_types::cluster::cluster_state::{PartitionProcessorStatus, ReplayStatus, RunMode}; use restate_types::config::WorkerOptions; +use restate_types::errors::KILLED_INVOCATION_ERROR; use restate_types::identifiers::{ LeaderEpoch, PartitionId, PartitionKey, PartitionProcessorRpcRequestId, WithPartitionKey, }; @@ -68,7 +70,7 @@ use crate::metric_definitions::{ }; use crate::partition::invoker_storage_reader::InvokerStorageReader; use crate::partition::leadership::{LeadershipState, PartitionProcessorMetadata}; -use crate::partition::state_machine::{ActionCollector, StateMachine}; +use crate::partition::state_machine::{ActionCollector, ExperimentalFeature, StateMachine}; mod cleaner; pub mod invoker_storage_reader; @@ -91,6 +93,7 @@ pub(super) struct PartitionProcessorBuilder { num_timers_in_memory_limit: Option, disable_idempotency_table: bool, + invocation_status_killed: bool, cleanup_interval: Duration, channel_size: usize, max_command_batch_size: usize, @@ -126,6 +129,7 @@ where status, num_timers_in_memory_limit: options.num_timers_in_memory_limit(), disable_idempotency_table: options.experimental_feature_disable_idempotency_table(), + invocation_status_killed: options.experimental_feature_invocation_status_killed(), cleanup_interval: options.cleanup_interval(), channel_size: options.internal_queue_length(), max_command_batch_size: options.max_command_batch_size(), @@ -148,6 +152,7 @@ where num_timers_in_memory_limit, cleanup_interval, disable_idempotency_table, + invocation_status_killed, channel_size, max_command_batch_size, invoker_tx, @@ -162,6 +167,7 @@ where &mut partition_store, partition_key_range.clone(), disable_idempotency_table, + invocation_status_killed, ) .await?; @@ -211,6 +217,7 @@ where partition_store: &mut PartitionStore, partition_key_range: RangeInclusive, disable_idempotency_table: bool, + invocation_status_killed: bool, ) -> Result, StorageError> where Codec: RawEntryCodec + Default + Debug, @@ -219,12 +226,22 @@ where let outbox_seq_number = partition_store.get_outbox_seq_number().await?; let outbox_head_seq_number = partition_store.get_outbox_head_seq_number().await?; + let experimental_features = if disable_idempotency_table { + ExperimentalFeature::DisableIdempotencyTable.into() + } else { + EnumSet::empty() + } | if invocation_status_killed { + ExperimentalFeature::InvocationStatusKilled.into() + } else { + EnumSet::empty() + }; + let state_machine = StateMachine::new( inbox_seq_number, outbox_seq_number, outbox_head_seq_number, partition_key_range, - disable_idempotency_table, + experimental_features, ); Ok(state_machine) @@ -693,6 +710,14 @@ where completion_expiry_time, })) } + InvocationStatus::Killed(_) => { + Ok(PartitionProcessorRpcResponse::Output(InvocationOutput { + request_id, + response: IngressResponseResult::Failure(KILLED_INVOCATION_ERROR), + invocation_id: Some(invocation_id), + completion_expiry_time: None, + })) + } _ => Ok(PartitionProcessorRpcResponse::NotReady), } } diff --git a/crates/worker/src/partition/state_machine/actions.rs b/crates/worker/src/partition/state_machine/actions.rs index 723c8b1a2..67dffe1a6 100644 --- a/crates/worker/src/partition/state_machine/actions.rs +++ b/crates/worker/src/partition/state_machine/actions.rs @@ -47,7 +47,10 @@ pub enum Action { invocation_id: InvocationId, completion: Completion, }, - AbortInvocation(InvocationId), + AbortInvocation { + invocation_id: InvocationId, + acknowledge: bool, + }, IngressResponse { request_id: PartitionProcessorRpcRequestId, invocation_id: Option, diff --git a/crates/worker/src/partition/state_machine/mod.rs b/crates/worker/src/partition/state_machine/mod.rs index 9677fae80..4d232e060 100644 --- a/crates/worker/src/partition/state_machine/mod.rs +++ b/crates/worker/src/partition/state_machine/mod.rs @@ -18,6 +18,7 @@ pub use actions::{Action, ActionCollector}; use assert2::let_assert; use bytes::Bytes; use bytestring::ByteString; +use enumset::EnumSet; use futures::{StreamExt, TryStreamExt}; use metrics::{histogram, Histogram}; use restate_invoker_api::InvokeInputJournal; @@ -45,10 +46,9 @@ use restate_storage_api::Result as StorageResult; use restate_tracing_instrumentation as instrumentation; use restate_types::deployment::PinnedDeployment; use restate_types::errors::{ - InvocationError, InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, - ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, CANCELED_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, - NOT_FOUND_INVOCATION_ERROR, NOT_READY_INVOCATION_ERROR, - WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR, + InvocationErrorCode, ALREADY_COMPLETED_INVOCATION_ERROR, ATTACH_NOT_SUPPORTED_INVOCATION_ERROR, + CANCELED_INVOCATION_ERROR, KILLED_INVOCATION_ERROR, NOT_FOUND_INVOCATION_ERROR, + NOT_READY_INVOCATION_ERROR, WORKFLOW_ALREADY_INVOKED_INVOCATION_ERROR, }; use restate_types::identifiers::{ EntryIndex, InvocationId, PartitionKey, PartitionProcessorRpcRequestId, ServiceId, @@ -90,6 +90,16 @@ use std::time::Instant; use tracing::error; use utils::SpanExt; +#[derive(Debug, Hash, enumset::EnumSetType, strum::Display)] +pub enum ExperimentalFeature { + /// This is used to disable writing to idempotency table/virtual object status table for idempotent invocations/workflow invocations. + /// From Restate 1.2 invocation ids are generated deterministically, so this additional index is not needed. + DisableIdempotencyTable, + /// If true, kill should wait for end signal from invoker, in order to implement the restart functionality. + /// This is enabled by experimental_feature_kill_and_restart. + InvocationStatusKilled, +} + pub struct StateMachine { // initialized from persistent storage inbox_seq_number: MessageIndex, @@ -100,9 +110,8 @@ pub struct StateMachine { partition_key_range: RangeInclusive, latency: Histogram, - /// This is used to disable writing to idempotency table/virtual object status table for idempotent invocations/workflow invocations. - /// From Restate 1.2 invocation ids are generated deterministically, so this additional index is not needed. - disable_idempotency_table: bool, + /// Enabled experimental features. + experimental_features: EnumSet, _codec: PhantomData, } @@ -160,7 +169,7 @@ impl StateMachine { outbox_seq_number: MessageIndex, outbox_head_seq_number: Option, partition_key_range: RangeInclusive, - disable_idempotency_table: bool, + experimental_features: EnumSet, ) -> Self { let latency = histogram!(crate::metric_definitions::PARTITION_HANDLE_INVOKER_EFFECT_COMMAND); @@ -170,7 +179,7 @@ impl StateMachine { outbox_head_seq_number, partition_key_range, latency, - disable_idempotency_table, + experimental_features, _codec: PhantomData, } } @@ -287,11 +296,11 @@ impl StateMachine { } Command::Timer(timer) => self.on_timer(&mut ctx, timer).await, Command::TerminateInvocation(invocation_termination) => { - self.try_terminate_invocation(&mut ctx, invocation_termination) + self.on_terminate_invocation(&mut ctx, invocation_termination) .await } Command::PurgeInvocation(purge_invocation_request) => { - self.try_purge_invocation(&mut ctx, purge_invocation_request.invocation_id) + self.on_purge_invocation(&mut ctx, purge_invocation_request.invocation_id) .await } Command::PatchState(mutation) => { @@ -482,7 +491,11 @@ impl StateMachine { // Store the invocation id mapping if we have to and continue the processing // TODO get rid of this code when we remove the usage of the virtual object table for workflows - if is_workflow_run && !self.disable_idempotency_table { + if is_workflow_run + && !self + .experimental_features + .contains(ExperimentalFeature::DisableIdempotencyTable) + { ctx.storage .put_virtual_object_status( &service_invocation @@ -494,7 +507,11 @@ impl StateMachine { .await; } // TODO get rid of this code when we remove the idempotency table - if has_idempotency_key && !self.disable_idempotency_table { + if has_idempotency_key + && !self + .experimental_features + .contains(ExperimentalFeature::DisableIdempotencyTable) + { Self::do_store_idempotency_id( ctx, service_invocation @@ -564,6 +581,17 @@ impl StateMachine { } } } + InvocationStatus::Killed(metadata) => { + self.send_response_to_sinks( + ctx, + service_invocation.response_sink.take().into_iter(), + KILLED_INVOCATION_ERROR, + Some(invocation_id), + None, + Some(&metadata.invocation_target), + ) + .await?; + } InvocationStatus::Completed(completed) => { // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. let completion_expiry_time = unsafe { completed.completion_expiry_time() }; @@ -810,7 +838,7 @@ impl StateMachine { Ok(()) } - async fn try_terminate_invocation< + async fn on_terminate_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -828,12 +856,12 @@ impl StateMachine { }: InvocationTermination, ) -> Result<(), Error> { match termination_flavor { - TerminationFlavor::Kill => self.try_kill_invocation(ctx, invocation_id).await, - TerminationFlavor::Cancel => self.try_cancel_invocation(ctx, invocation_id).await, + TerminationFlavor::Kill => self.on_kill_invocation(ctx, invocation_id).await, + TerminationFlavor::Cancel => self.on_cancel_invocation(ctx, invocation_id).await, } } - async fn try_kill_invocation< + async fn on_kill_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -841,6 +869,7 @@ impl StateMachine { + StateTable + JournalTable + OutboxTable + + TimerTable + FsmTable, >( &mut self, @@ -850,8 +879,13 @@ impl StateMachine { let status = ctx.get_invocation_status(&invocation_id).await?; match status { - InvocationStatus::Invoked(metadata) | InvocationStatus::Suspended { metadata, .. } => { - self.kill_invocation(ctx, invocation_id, metadata).await?; + InvocationStatus::Invoked(metadata) => { + self.kill_invoked_invocation(ctx, invocation_id, metadata) + .await?; + } + InvocationStatus::Suspended { metadata, .. } => { + self.kill_suspended_invocation(ctx, invocation_id, metadata) + .await?; } InvocationStatus::Inboxed(inboxed) => { self.terminate_inboxed_invocation( @@ -862,7 +896,23 @@ impl StateMachine { ) .await? } - _ => { + InvocationStatus::Scheduled(scheduled) => { + self.terminate_scheduled_invocation( + ctx, + TerminationFlavor::Kill, + invocation_id, + scheduled, + ) + .await? + } + InvocationStatus::Killed(_) => { + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received kill command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { trace!("Received kill command for unknown invocation with id '{invocation_id}'."); // We still try to send the abort signal to the invoker, // as it might be the case that previously the user sent an abort signal @@ -870,14 +920,14 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; Ok(()) } - async fn try_cancel_invocation< + async fn on_cancel_invocation< State: VirtualObjectStatusTable + InvocationStatusTable + InboxTable @@ -928,7 +978,23 @@ impl StateMachine { ) .await? } - _ => { + InvocationStatus::Scheduled(scheduled) => { + self.terminate_scheduled_invocation( + ctx, + TerminationFlavor::Cancel, + invocation_id, + scheduled, + ) + .await? + } + InvocationStatus::Killed(_) => { + // Nothing to do here really, let's send again the abort signal to the invoker just in case + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } + InvocationStatus::Completed(_) => { + debug!("Received cancel command for completed invocation '{invocation_id}'. To cleanup the invocation after it's been completed, use the purge invocation command."); + } + InvocationStatus::Free => { trace!("Received cancel command for unknown invocation with id '{invocation_id}'."); // We still try to send the abort signal to the invoker, // as it might be the case that previously the user sent an abort signal @@ -936,7 +1002,8 @@ impl StateMachine { // This can happen because the invoke/resume and the abort invoker messages end up in different queues, // and the abort message can overtake the invoke/resume. // Consequently the invoker might have not received the abort and the user tried to send it again. - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + // TODO + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); } }; @@ -1002,7 +1069,67 @@ impl StateMachine { Ok(()) } - async fn kill_invocation< + async fn terminate_scheduled_invocation< + State: InvocationStatusTable + TimerTable + OutboxTable + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + termination_flavor: TerminationFlavor, + invocation_id: InvocationId, + scheduled_invocation: ScheduledInvocation, + ) -> Result<(), Error> { + let error = match termination_flavor { + TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + }; + + let ScheduledInvocation { + metadata: + PreFlightInvocationMetadata { + response_sinks, + span_context, + invocation_target, + execution_time, + .. + }, + } = scheduled_invocation; + + // Reply back to callers with error, and publish end trace + self.send_response_to_sinks( + ctx, + response_sinks, + &error, + Some(invocation_id), + None, + Some(&invocation_target), + ) + .await?; + + // Delete timer + if let Some(execution_time) = execution_time { + Self::do_delete_timer( + ctx, + TimerKey::neo_invoke(execution_time.as_u64(), invocation_id.invocation_uuid()), + ) + .await?; + } else { + warn!("Scheduled invocations must always have an execution time."); + } + Self::do_free_invocation(ctx, invocation_id).await; + + self.notify_invocation_result( + ctx, + invocation_id, + invocation_target, + span_context, + MillisSinceEpoch::now(), + Err((error.code(), error.to_string())), + ); + + Ok(()) + } + + async fn kill_invoked_invocation< State: InboxTable + VirtualObjectStatusTable + InvocationStatusTable @@ -1020,9 +1147,61 @@ impl StateMachine { self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; - self.fail_invocation(ctx, invocation_id, metadata, KILLED_INVOCATION_ERROR) + if self + .experimental_features + .contains(ExperimentalFeature::InvocationStatusKilled) + { + debug_if_leader!( + ctx.is_leader, + restate.invocation.id = %invocation_id, + "Effect: Store killed invocation" + ); + + ctx.storage + .put_invocation_status(&invocation_id, &InvocationStatus::Killed(metadata)) + .await; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, true); + } else { + self.end_invocation( + ctx, + invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) + .await?; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + } + Ok(()) + } + + async fn kill_suspended_invocation< + State: InboxTable + + VirtualObjectStatusTable + + InvocationStatusTable + + VirtualObjectStatusTable + + StateTable + + JournalTable + + OutboxTable + + FsmTable, + >( + &mut self, + ctx: &mut StateMachineApplyContext<'_, State>, + invocation_id: InvocationId, + metadata: InFlightInvocationMetadata, + ) -> Result<(), Error> { + self.kill_child_invocations(ctx, &invocation_id, metadata.journal_metadata.length) .await?; - Self::do_send_abort_invocation_to_invoker(ctx, invocation_id); + + // No need to go through the Killed state when we're suspended, + // because it means we already got a terminal state from the invoker. + self.end_invocation( + ctx, + invocation_id, + metadata, + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)), + ) + .await?; + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); Ok(()) } @@ -1179,7 +1358,7 @@ impl StateMachine { } } - async fn try_purge_invocation< + async fn on_purge_invocation< State: InvocationStatusTable + IdempotencyTable + VirtualObjectStatusTable @@ -1276,7 +1455,7 @@ impl StateMachine { self.on_service_invocation(ctx, service_invocation).await } Timer::CleanInvocationStatus(invocation_id) => { - self.try_purge_invocation(ctx, invocation_id).await + self.on_purge_invocation(ctx, invocation_id).await } Timer::NeoInvoke(invocation_id) => self.on_neo_invoke_timer(ctx, invocation_id).await, } @@ -1355,17 +1534,7 @@ impl StateMachine { let status = ctx .get_invocation_status(&invoker_effect.invocation_id) .await?; - - match status { - InvocationStatus::Invoked(invocation_metadata) => { - self.on_invoker_effect(ctx, invoker_effect, invocation_metadata) - .await? - } - _ => { - trace!("Received invoker effect for unknown service invocation. Ignoring the effect and aborting."); - Self::do_send_abort_invocation_to_invoker(ctx, invoker_effect.invocation_id); - } - }; + self.on_invoker_effect(ctx, invoker_effect, status).await?; self.latency.record(start.elapsed()); Ok(()) @@ -1388,8 +1557,29 @@ impl StateMachine { invocation_id, kind, }: InvokerEffect, - invocation_metadata: InFlightInvocationMetadata, + invocation_status: InvocationStatus, ) -> Result<(), Error> { + let is_status_invoked = matches!(invocation_status, InvocationStatus::Invoked(_)); + let is_status_killed = matches!(invocation_status, InvocationStatus::Killed(_)); + + if !is_status_invoked && !is_status_killed { + trace!("Received invoker effect for invocation not in invoked nor killed status. Ignoring the effect."); + Self::do_send_abort_invocation_to_invoker(ctx, invocation_id, false); + return Ok(()); + } + if is_status_killed + && !matches!(kind, InvokerEffectKind::Failed(_) | InvokerEffectKind::End) + { + warn!( + "Received non terminal invoker effect for killed invocation. Ignoring the effect." + ); + return Ok(()); + } + + let invocation_metadata = invocation_status + .into_invocation_metadata() + .expect("Must be present if status is killed or invoked"); + match kind { InvokerEffectKind::PinnedDeployment(pinned_deployment) => { Self::do_store_pinned_deployment( @@ -1447,12 +1637,27 @@ impl StateMachine { } } InvokerEffectKind::End => { - self.end_invocation(ctx, invocation_id, invocation_metadata) - .await?; + self.end_invocation( + ctx, + invocation_id, + invocation_metadata, + if is_status_killed { + // It doesn't matter that the invocation successfully completed, we return failed anyway in this case. + Some(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + } else { + None + }, + ) + .await?; } InvokerEffectKind::Failed(e) => { - self.fail_invocation(ctx, invocation_id, invocation_metadata, e) - .await?; + self.end_invocation( + ctx, + invocation_id, + invocation_metadata, + Some(ResponseResult::Failure(e)), + ) + .await?; } } @@ -1472,26 +1677,19 @@ impl StateMachine { ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, invocation_metadata: InFlightInvocationMetadata, + // If given, this will override any Output Entry available in the journal table + response_result_override: Option, ) -> Result<(), Error> { + let invocation_target = invocation_metadata.invocation_target.clone(); let journal_length = invocation_metadata.journal_metadata.length; let completion_retention_time = invocation_metadata.completion_retention_duration; - self.notify_invocation_result( - ctx, - invocation_id, - invocation_metadata.invocation_target.clone(), - invocation_metadata.journal_metadata.span_context.clone(), - unsafe { invocation_metadata.timestamps.creation_time() }, - Ok(()), - ); - - // Pop from inbox - Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; - // If there are any response sinks, or we need to store back the completed status, // we need to find the latest output entry if !invocation_metadata.response_sinks.is_empty() || !completion_retention_time.is_zero() { - let result = if let Some(output_entry) = self + let response_result = if let Some(response_result) = response_result_override { + response_result + } else if let Some(output_entry) = self .read_last_output_entry(ctx, &invocation_id, journal_length) .await? { @@ -1506,21 +1704,44 @@ impl StateMachine { self.send_response_to_sinks( ctx, invocation_metadata.response_sinks.clone(), - result.clone(), + response_result.clone(), Some(invocation_id), None, Some(&invocation_metadata.invocation_target), ) .await?; + // Notify invocation result + self.notify_invocation_result( + ctx, + invocation_id, + invocation_metadata.invocation_target.clone(), + invocation_metadata.journal_metadata.span_context.clone(), + unsafe { invocation_metadata.timestamps.creation_time() }, + match &response_result { + ResponseResult::Success(_) => Ok(()), + ResponseResult::Failure(err) => Err((err.code(), err.message().to_owned())), + }, + ); + // Store the completed status, if needed if !completion_retention_time.is_zero() { let completed_invocation = CompletedInvocation::from_in_flight_invocation_metadata( invocation_metadata, - result, + response_result, ); Self::do_store_completed_invocation(ctx, invocation_id, completed_invocation).await; } + } else { + // Just notify Ok, no need to read the output entry + self.notify_invocation_result( + ctx, + invocation_id, + invocation_target.clone(), + invocation_metadata.journal_metadata.span_context.clone(), + unsafe { invocation_metadata.timestamps.creation_time() }, + Ok(()), + ); } // If no retention, immediately cleanup the invocation status @@ -1529,64 +1750,8 @@ impl StateMachine { } Self::do_drop_journal(ctx, invocation_id, journal_length).await; - Ok(()) - } - - async fn fail_invocation< - State: InboxTable - + VirtualObjectStatusTable - + InvocationStatusTable - + VirtualObjectStatusTable - + StateTable - + JournalTable - + OutboxTable - + FsmTable, - >( - &mut self, - ctx: &mut StateMachineApplyContext<'_, State>, - invocation_id: InvocationId, - invocation_metadata: InFlightInvocationMetadata, - error: InvocationError, - ) -> Result<(), Error> { - let journal_length = invocation_metadata.journal_metadata.length; - - self.notify_invocation_result( - ctx, - invocation_id, - invocation_metadata.invocation_target.clone(), - invocation_metadata.journal_metadata.span_context.clone(), - unsafe { invocation_metadata.timestamps.creation_time() }, - Err((error.code(), error.to_string())), - ); - - let response_result = ResponseResult::from(error); - - // Send responses out - self.send_response_to_sinks( - ctx, - invocation_metadata.response_sinks.clone(), - response_result.clone(), - Some(invocation_id), - None, - Some(&invocation_metadata.invocation_target), - ) - .await?; - - // Pop from inbox - Self::consume_inbox(ctx, &invocation_metadata.invocation_target).await?; - - // Store the completed status or free it - if !invocation_metadata.completion_retention_duration.is_zero() { - let completed_invocation = CompletedInvocation::from_in_flight_invocation_metadata( - invocation_metadata, - response_result, - ); - Self::do_store_completed_invocation(ctx, invocation_id, completed_invocation).await; - } else { - Self::do_free_invocation(ctx, invocation_id).await; - } - - Self::do_drop_journal(ctx, invocation_id, journal_length).await; + // Consume inbox and move on + Self::consume_inbox(ctx, &invocation_target).await?; Ok(()) } @@ -2785,6 +2950,17 @@ impl StateMachine { .await?; } } + InvocationStatus::Killed(metadata) => { + self.send_response_to_sinks( + ctx, + vec![attach_invocation_request.response_sink], + KILLED_INVOCATION_ERROR, + Some(invocation_id), + None, + Some(&metadata.invocation_target), + ) + .await?; + } InvocationStatus::Completed(completed) => { // SAFETY: We use this field to send back the notification to ingress, and not as part of the PP deterministic logic. let completion_expiry_time = unsafe { completed.completion_expiry_time() }; @@ -3487,11 +3663,14 @@ impl StateMachine { fn do_send_abort_invocation_to_invoker( ctx: &mut StateMachineApplyContext<'_, State>, invocation_id: InvocationId, + acknowledge: bool, ) { debug_if_leader!(ctx.is_leader, restate.invocation.id = %invocation_id, "Effect: Send abort command to invoker"); - ctx.action_collector - .push(Action::AbortInvocation(invocation_id)); + ctx.action_collector.push(Action::AbortInvocation { + invocation_id, + acknowledge, + }); } async fn do_mutate_state( diff --git a/crates/worker/src/partition/state_machine/tests/idempotency.rs b/crates/worker/src/partition/state_machine/tests/idempotency.rs index 7cfe80c80..bbfb15af3 100644 --- a/crates/worker/src/partition/state_machine/tests/idempotency.rs +++ b/crates/worker/src/partition/state_machine/tests/idempotency.rs @@ -25,11 +25,13 @@ use rstest::*; use std::time::Duration; #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn start_and_complete_idempotent_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -59,7 +61,7 @@ async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_ta ); // Assert idempotency key mapping exists only with idempotency table writes enabled - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -131,13 +133,13 @@ async fn start_and_complete_idempotent_invocation(#[case] disable_idempotency_ta } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] async fn start_and_complete_idempotent_invocation_neo_table( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -167,7 +169,7 @@ async fn start_and_complete_idempotent_invocation_neo_table( ); // Assert idempotency key mapping exists only with idempotency table writes enabled - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -243,11 +245,13 @@ async fn start_and_complete_idempotent_invocation_neo_table( } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn complete_already_completed_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn complete_already_completed_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); @@ -302,13 +306,13 @@ async fn complete_already_completed_invocation(#[case] disable_idempotency_table } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] async fn attach_with_service_invocation_command_while_executing( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -399,16 +403,16 @@ async fn attach_with_service_invocation_command_while_executing( } #[rstest] -#[case(true, true)] -#[case(true, false)] -#[case(false, true)] -#[case(false, false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into(), true)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into(), false)] +#[case(EnumSet::empty(), true)] +#[case(EnumSet::empty(), false)] #[tokio::test] async fn attach_with_send_service_invocation( - #[case] disable_idempotency_table: bool, + #[case] experimental_features: EnumSet, #[case] use_same_request_id: bool, ) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let retention = Duration::from_secs(60) * 60 * 24; @@ -524,11 +528,13 @@ async fn attach_with_send_service_invocation( } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn attach_inboxed_with_send_service_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_inboxed_with_send_service_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let request_id_1 = PartitionProcessorRpcRequestId::default(); @@ -620,11 +626,11 @@ async fn attach_inboxed_with_send_service_invocation(#[case] disable_idempotency } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn attach_command(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_command(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let completion_retention = Duration::from_secs(60) * 60 * 24; @@ -773,11 +779,13 @@ async fn attach_command_without_blocking_inflight() { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn purge_completed_idempotent_invocation(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn purge_completed_idempotent_invocation( + #[case] experimental_features: EnumSet, +) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let idempotency_key = ByteString::from_static("my-idempotency-key"); let invocation_target = InvocationTarget::mock_virtual_object(); diff --git a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs index f90560495..4efeb97c3 100644 --- a/crates/worker/src/partition/state_machine/tests/kill_cancel.rs +++ b/crates/worker/src/partition/state_machine/tests/kill_cancel.rs @@ -14,17 +14,24 @@ use assert2::assert; use assert2::let_assert; use googletest::any; use prost::Message; +use restate_storage_api::invocation_status_table::JournalMetadata; use restate_storage_api::journal_table::JournalTable; use restate_storage_api::timer_table::{Timer, TimerKey, TimerKeyKind, TimerTable}; use restate_types::identifiers::EntryIndex; use restate_types::invocation::TerminationFlavor; use restate_types::journal::enriched::EnrichedEntryHeader; use restate_types::service_protocol; +use rstest::rstest; use test_log::test; -#[test(tokio::test)] -async fn kill_inboxed_invocation() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into())] +#[case(EnumSet::empty())] +#[tokio::test] +async fn kill_inboxed_invocation( + #[case] experimental_features: EnumSet, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_virtual_object(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -76,41 +83,102 @@ async fn kill_inboxed_invocation() -> anyhow::Result<()> { // assert that invocation status was removed assert!(let InvocationStatus::Free = current_invocation_status); - fn outbox_message_matcher( - caller_id: InvocationId, - ) -> impl Matcher { - pat!( - restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( - restate_types::invocation::InvocationResponse { - id: eq(caller_id), - entry_index: eq(0), - result: eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) - } - )) - ) - } - assert_that!( actions, - contains(pat!(Action::NewOutboxMessage { - message: outbox_message_matcher(caller_id) - })) + contains( + matchers::actions::invocation_response_to_partition_processor( + caller_id, + 0, + eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + ) + ) ); let outbox_message = test_env.storage().get_next_outbox_message(0).await?; assert_that!( outbox_message, - some((ge(0), outbox_message_matcher(caller_id))) + some(( + ge(0), + matchers::outbox::invocation_response_to_partition_processor( + caller_id, + 0, + eq(ResponseResult::Failure(KILLED_INVOCATION_ERROR)) + ) + )) ); test_env.shutdown().await; Ok(()) } -#[test(tokio::test)] -async fn kill_call_tree() -> anyhow::Result<()> { - let mut test_env = TestEnv::create().await; +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into(), TerminationFlavor::Kill)] +#[case(EnumSet::empty(), TerminationFlavor::Kill)] +#[case(EnumSet::empty(), TerminationFlavor::Cancel)] +#[tokio::test] +async fn terminate_scheduled_invocation( + #[case] experimental_features: EnumSet, + #[case] termination_flavor: TerminationFlavor, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; + + let invocation_id = InvocationId::mock_random(); + let rpc_id = PartitionProcessorRpcRequestId::new(); + + let _ = test_env + .apply(Command::Invoke(ServiceInvocation { + invocation_id, + execution_time: Some(MillisSinceEpoch::MAX), + response_sink: Some(ServiceInvocationResponseSink::ingress(rpc_id)), + ..ServiceInvocation::mock() + })) + .await; + + // assert that inboxed invocation is in invocation_status + let current_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Scheduled(_) = current_invocation_status); + + let actions = test_env + .apply(Command::TerminateInvocation(InvocationTermination { + invocation_id, + flavor: termination_flavor, + })) + .await; + assert_that!( + actions, + contains(pat!(Action::IngressResponse { + request_id: eq(rpc_id), + invocation_id: some(eq(invocation_id)), + response: eq(IngressResponseResult::Failure(match termination_flavor { + TerminationFlavor::Kill => KILLED_INVOCATION_ERROR, + TerminationFlavor::Cancel => CANCELED_INVOCATION_ERROR, + })) + })) + ); + + // assert that invocation status was removed + let current_invocation_status = test_env + .storage() + .get_invocation_status(&invocation_id) + .await?; + assert!(let InvocationStatus::Free = current_invocation_status); + + test_env.shutdown().await; + Ok(()) +} + +#[rstest] +#[case(ExperimentalFeature::InvocationStatusKilled.into())] +#[case(EnumSet::empty())] +#[tokio::test] +async fn kill_call_tree( + #[case] experimental_features: EnumSet, +) -> anyhow::Result<()> { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let call_invocation_id = InvocationId::mock_random(); let background_call_invocation_id = InvocationId::mock_random(); @@ -170,31 +238,23 @@ async fn kill_call_tree() -> anyhow::Result<()> { ))) .await; - // Invocation should be gone - assert_that!( - test_env - .storage - .get_invocation_status(&invocation_id) - .await?, - pat!(InvocationStatus::Free) - ); - assert_that!( - test_env - .storage - .get_journal(&invocation_id, 4) - .try_collect::>() - .await?, - empty() - ); + let abort_command_matcher = + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + pat!(Action::AbortInvocation { + invocation_id: eq(invocation_id), + acknowledge: eq(true) + }) + } else { + pat!(Action::AbortInvocation { + invocation_id: eq(invocation_id), + acknowledge: eq(false) + }) + }; assert_that!( actions, all!( - contains(pat!(Action::AbortInvocation(eq(invocation_id)))), - contains(pat!(Action::Invoke { - invocation_id: eq(enqueued_invocation_id_on_same_target), - invocation_target: eq(invocation_target) - })), + contains(abort_command_matcher), contains(matchers::actions::terminate_invocation( call_invocation_id, TerminationFlavor::Kill @@ -213,6 +273,92 @@ async fn kill_call_tree() -> anyhow::Result<()> { }))) ) ); + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + // We don't pop the inbox yet, but only after invocation ends + assert_that!( + actions, + not(contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target.clone(), + ))) + ) + } else { + // Inbox should have been popped + assert_that!( + actions, + contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target.clone(), + )) + ) + }; + + if experimental_features.contains(ExperimentalFeature::InvocationStatusKilled) { + // A couple of new expectations here: + // * the invocation status is now in killed state + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Killed { .. }) + ); + + // * No new journal entries will be accepted! + let _ = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::JournalEntry { + entry_index: 4, + entry: ProtobufRawEntryCodec::serialize_enriched(Entry::ClearAllState), + }, + })) + .await; + // Journal entry was ignored (journal length == 4) + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Killed(pat!(InFlightInvocationMetadata { + journal_metadata: pat!(JournalMetadata { length: eq(4) }) + }))) + ); + + // Now send the Failed invoker effect + let actions = test_env + .apply(Command::InvokerEffect(InvokerEffect { + invocation_id, + kind: InvokerEffectKind::Failed(KILLED_INVOCATION_ERROR), + })) + .await; + + // The inbox is popped after the invoker sends failed + assert_that!( + actions, + contains(matchers::actions::invoke_for_id_and_target( + enqueued_invocation_id_on_same_target, + invocation_target + )) + ); + } + + // Invocation should be finally gone + assert_that!( + test_env + .storage + .get_invocation_status(&invocation_id) + .await?, + pat!(InvocationStatus::Free) + ); + assert_that!( + test_env + .storage + .get_journal(&invocation_id, 4) + .try_collect::>() + .await?, + empty() + ); test_env.shutdown().await; Ok(()) diff --git a/crates/worker/src/partition/state_machine/tests/matchers.rs b/crates/worker/src/partition/state_machine/tests/matchers.rs index ccc769ff9..290a7ce7e 100644 --- a/crates/worker/src/partition/state_machine/tests/matchers.rs +++ b/crates/worker/src/partition/state_machine/tests/matchers.rs @@ -52,7 +52,7 @@ pub mod actions { use crate::partition::state_machine::Action; use restate_types::identifiers::InvocationId; - use restate_types::invocation::{InvocationResponse, ResponseResult}; + use restate_types::invocation::{InvocationTarget, ResponseResult}; pub fn invoke_for_id(invocation_id: InvocationId) -> impl Matcher { pat!(Action::Invoke { @@ -60,6 +60,16 @@ pub mod actions { }) } + pub fn invoke_for_id_and_target( + invocation_id: InvocationId, + invocation_target: InvocationTarget, + ) -> impl Matcher { + pat!(Action::Invoke { + invocation_id: eq(invocation_id), + invocation_target: eq(invocation_target) + }) + } + pub fn delete_sleep_timer(entry_index: EntryIndex) -> impl Matcher { pat!(Action::DeleteTimer { timer_key: pat!(TimerKey { @@ -109,19 +119,39 @@ pub mod actions { response_result_matcher: impl Matcher + 'static, ) -> impl Matcher { pat!(Action::NewOutboxMessage { - message: pat!( - restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( - InvocationResponse { - id: eq(caller_invocation_id), - entry_index: eq(caller_entry_index), - result: response_result_matcher - } - )) + message: outbox::invocation_response_to_partition_processor( + caller_invocation_id, + caller_entry_index, + response_result_matcher ) }) } } +pub mod outbox { + use super::*; + + use restate_storage_api::outbox_table::OutboxMessage; + use restate_types::identifiers::InvocationId; + use restate_types::invocation::{InvocationResponse, ResponseResult}; + + pub fn invocation_response_to_partition_processor( + caller_invocation_id: InvocationId, + caller_entry_index: EntryIndex, + response_result_matcher: impl Matcher + 'static, + ) -> impl Matcher { + pat!( + restate_storage_api::outbox_table::OutboxMessage::ServiceResponse(pat!( + InvocationResponse { + id: eq(caller_invocation_id), + entry_index: eq(caller_entry_index), + result: response_result_matcher + } + )) + ) + } +} + pub fn completion( entry_index: EntryIndex, completion_result: CompletionResult, diff --git a/crates/worker/src/partition/state_machine/tests/mod.rs b/crates/worker/src/partition/state_machine/tests/mod.rs index 9e68666ce..7881b0459 100644 --- a/crates/worker/src/partition/state_machine/tests/mod.rs +++ b/crates/worker/src/partition/state_machine/tests/mod.rs @@ -27,7 +27,6 @@ use ::tracing::info; use bytes::Bytes; use bytestring::ByteString; use futures::{StreamExt, TryStreamExt}; -use googletest::matcher::Matcher; use googletest::{all, assert_that, pat, property}; use restate_core::{task_center, TaskCenter, TaskCenterBuilder}; use restate_invoker_api::{EffectKind, InvokeInputJournal}; @@ -84,16 +83,18 @@ impl TestEnv { } pub async fn create() -> Self { - Self::create_with_options(false).await + Self::create_with_experimental_features(Default::default()).await } - pub async fn create_with_options(disable_idempotency_table: bool) -> Self { + pub async fn create_with_experimental_features( + experimental_features: EnumSet, + ) -> Self { Self::create_with_state_machine(StateMachine::new( 0, /* inbox_seq_number */ 0, /* outbox_seq_number */ None, /* outbox_head_seq_number */ PartitionKey::MIN..=PartitionKey::MAX, - disable_idempotency_table, + experimental_features, )) .await } @@ -967,7 +968,7 @@ async fn truncate_outbox_with_gap() -> Result<(), Error> { outbox_tail_index, Some(outbox_head_index), PartitionKey::MIN..=PartitionKey::MAX, - false, + EnumSet::empty(), )) .await; diff --git a/crates/worker/src/partition/state_machine/tests/workflow.rs b/crates/worker/src/partition/state_machine/tests/workflow.rs index 8c9c60b5b..dd94e28ac 100644 --- a/crates/worker/src/partition/state_machine/tests/workflow.rs +++ b/crates/worker/src/partition/state_machine/tests/workflow.rs @@ -20,11 +20,11 @@ use rstest::*; use std::time::Duration; #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn start_workflow_method(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn start_workflow_method(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -52,7 +52,7 @@ async fn start_workflow_method(#[case] disable_idempotency_table: bool) { ); // Assert service is locked only if we enable the idempotency table - if disable_idempotency_table { + if experimental_features.contains(ExperimentalFeature::DisableIdempotencyTable) { assert_that!( test_env .storage() @@ -184,11 +184,11 @@ async fn start_workflow_method(#[case] disable_idempotency_table: bool) { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn attach_by_workflow_key(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn attach_by_workflow_key(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_generate(&invocation_target); @@ -322,11 +322,11 @@ async fn attach_by_workflow_key(#[case] disable_idempotency_table: bool) { } #[rstest] -#[case(true)] -#[case(false)] +#[case(ExperimentalFeature::DisableIdempotencyTable.into())] +#[case(EnumSet::empty())] #[tokio::test] -async fn purge_completed_workflow(#[case] disable_idempotency_table: bool) { - let mut test_env = TestEnv::create_with_options(disable_idempotency_table).await; +async fn purge_completed_workflow(#[case] experimental_features: EnumSet) { + let mut test_env = TestEnv::create_with_experimental_features(experimental_features).await; let invocation_target = InvocationTarget::mock_workflow(); let invocation_id = InvocationId::mock_random();