Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Side quest: Optimize ReadOnlyInvocationStatusTable#all_invoked_or_killed_invocations #2349

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
18 changes: 18 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,24 @@ jobs:
RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug
testArtifactOutput: sdk-java-kafka-next-gen-integration-test-report

sdk-java-invocation-status-killed:
name: Run SDK-Java integration tests with InvocationStatusKilled
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
secrets: inherit
needs: docker
uses: restatedev/sdk-java/.github/workflows/integration.yaml@main
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
envVars: |
RESTATE_WORKER__EXPERIMENTAL_FEATURE_INVOCATION_STATUS_KILLED=true
RUST_LOG=info,restate_invoker=trace,restate_ingress_http=trace,restate_bifrost=trace,restate_log_server=trace,restate_core::partitions=trace,restate=debug
testArtifactOutput: sdk-java-invocation-status-killed-integration-test-report

sdk-python:
name: Run SDK-Python integration tests
permissions:
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub enum InvocationState {
Running,
Suspended,
BackingOff,
Killed,
Completed,
}

Expand All @@ -142,6 +143,7 @@ impl FromStr for InvocationState {
"suspended" => Self::Suspended,
"backing-off" => Self::BackingOff,
"completed" => Self::Completed,
"killed" => Self::Killed,
_ => Self::Unknown,
})
}
Expand All @@ -157,6 +159,7 @@ impl Display for InvocationState {
InvocationState::Running => write!(f, "running"),
InvocationState::Suspended => write!(f, "suspended"),
InvocationState::BackingOff => write!(f, "backing-off"),
InvocationState::Killed => write!(f, "killed"),
InvocationState::Completed => write!(f, "completed"),
}
}
Expand Down
1 change: 1 addition & 0 deletions cli/src/ui/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ pub fn invocation_status_style(status: InvocationState) -> Style {
InvocationState::Suspended => DStyle::new().dim(),
InvocationState::BackingOff => DStyle::new().red(),
InvocationState::Completed => DStyle::new().blue(),
InvocationState::Killed => DStyle::new().red(),
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/invoker-api/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ pub trait InvokerHandle<SR> {
&mut self,
partition_leader_epoch: PartitionLeaderEpoch,
invocation_id: InvocationId,
// If true, acknowledge the abort. This will generate a Failed effect
acknowledge: bool,
) -> impl Future<Output = Result<(), NotRunningError>> + Send;

fn register_partition(
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub mod test_util {
&mut self,
_partition_leader_epoch: PartitionLeaderEpoch,
_invocation_id: InvocationId,
_acknowledge: bool,
) -> Result<(), NotRunningError> {
Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions crates/invoker-impl/src/input_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ pub(crate) enum InputCommand<SR> {
Abort {
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
acknowledge: bool,
},

/// Command used to clean up internal state when a partition leader is going away
Expand Down Expand Up @@ -129,11 +130,13 @@ impl<SR: Send> restate_invoker_api::InvokerHandle<SR> for InvokerHandle<SR> {
&mut self,
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
acknowledge: bool,
) -> Result<(), NotRunningError> {
self.input
.send(InputCommand::Abort {
partition,
invocation_id,
acknowledge,
})
.map_err(|_| NotRunningError)
}
Expand Down
22 changes: 17 additions & 5 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub use input_command::ChannelStatusReader;
pub use input_command::InvokerHandle;
use restate_service_client::{AssumeRoleCacheMode, ServiceClient};
use restate_types::deployment::PinnedDeployment;
use restate_types::errors::KILLED_INVOCATION_ERROR;
use restate_types::invocation::InvocationTarget;
use restate_types::schema::service::ServiceMetadataResolver;

Expand Down Expand Up @@ -351,8 +352,8 @@ where
self.handle_register_partition(partition, partition_key_range,
storage_reader, sender);
},
InputCommand::Abort { partition, invocation_id } => {
self.handle_abort_invocation(partition, invocation_id);
InputCommand::Abort { partition, invocation_id, acknowledge } => {
self.handle_abort_invocation(partition, invocation_id, acknowledge).await;
}
InputCommand::AbortAllPartition { partition } => {
self.handle_abort_partition(partition);
Expand Down Expand Up @@ -808,12 +809,13 @@ where
restate.invoker.partition_leader_epoch = ?partition,
)
)]
fn handle_abort_invocation(
async fn handle_abort_invocation(
&mut self,
partition: PartitionLeaderEpoch,
invocation_id: InvocationId,
acknowledge: bool,
) {
if let Some((_, _, mut ism)) = self
if let Some((tx, _, mut ism)) = self
.invocation_state_machine_manager
.remove_invocation(partition, &invocation_id)
{
Expand All @@ -823,6 +825,14 @@ where
ism.abort();
self.quota.unreserve_slot();
self.status_store.on_end(&partition, &invocation_id);
if acknowledge {
let _ = tx
.send(Effect {
invocation_id,
kind: EffectKind::Failed(KILLED_INVOCATION_ERROR),
})
.await;
}
} else {
trace!("Ignoring Abort command because there is no matching partition/invocation");
}
Expand Down Expand Up @@ -1415,7 +1425,9 @@ mod tests {
assert_eq!(*available_slots, 1);

// Abort the invocation
service_inner.handle_abort_invocation(MOCK_PARTITION, invocation_id);
service_inner
.handle_abort_invocation(MOCK_PARTITION, invocation_id, false)
.await;

// Check the quota
let_assert!(InvokerConcurrencyQuota::Limited { available_slots } = &service_inner.quota);
Expand Down
41 changes: 27 additions & 14 deletions crates/partition-store/src/invocation_status_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ use futures::Stream;
use futures_util::stream;
use restate_rocksdb::RocksDbPerfGuard;
use restate_storage_api::invocation_status_table::{
InvocationStatus, InvocationStatusTable, InvocationStatusV1, ReadOnlyInvocationStatusTable,
InvocationLite, InvocationStatus, InvocationStatusDiscriminants, InvocationStatusTable,
InvocationStatusV1, InvokedOrKilledInvocationStatusLite, ReadOnlyInvocationStatusTable,
};
use restate_storage_api::{Result, StorageError};
use restate_types::identifiers::{InvocationId, InvocationUuid, PartitionKey, WithPartitionKey};
use restate_types::invocation::InvocationTarget;
use restate_types::storage::StorageCodec;
use std::ops::RangeInclusive;
use tracing::trace;
Expand Down Expand Up @@ -169,7 +169,7 @@ fn delete_invocation_status<S: StorageAccess>(storage: &mut S, invocation_id: &I
fn invoked_invocations<S: StorageAccess>(
storage: &mut S,
partition_key_range: RangeInclusive<PartitionKey>,
) -> Vec<Result<(InvocationId, InvocationTarget)>> {
) -> Vec<Result<InvokedOrKilledInvocationStatusLite>> {
let _x = RocksDbPerfGuard::new("invoked-invocations");
let mut invocations = storage.for_each_key_value_in_place(
FullScanPartitionKeyRange::<InvocationStatusKeyV1>(partition_key_range.clone()),
Expand Down Expand Up @@ -239,12 +239,16 @@ fn all_invocation_status<S: StorageAccess>(
fn read_invoked_v1_full_invocation_id(
mut k: &mut &[u8],
v: &mut &[u8],
) -> Result<Option<(InvocationId, InvocationTarget)>> {
) -> Result<Option<InvokedOrKilledInvocationStatusLite>> {
let invocation_id = invocation_id_from_v1_key_bytes(&mut k)?;
let invocation_status = StorageCodec::decode::<InvocationStatusV1, _>(v)
.map_err(|err| StorageError::Generic(err.into()))?;
if let InvocationStatus::Invoked(invocation_meta) = invocation_status.0 {
Ok(Some((invocation_id, invocation_meta.invocation_target)))
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_meta.invocation_target,
is_invoked: true,
}))
} else {
Ok(None)
}
Expand All @@ -253,13 +257,22 @@ fn read_invoked_v1_full_invocation_id(
fn read_invoked_full_invocation_id(
mut k: &mut &[u8],
v: &mut &[u8],
) -> Result<Option<(InvocationId, InvocationTarget)>> {
// TODO this can be improved by simply parsing InvocationTarget and the Status enum
) -> Result<Option<InvokedOrKilledInvocationStatusLite>> {
let invocation_id = invocation_id_from_key_bytes(&mut k)?;
let invocation_status = StorageCodec::decode::<InvocationStatus, _>(v)
let invocation_status = StorageCodec::decode::<InvocationLite, _>(v)
.map_err(|err| StorageError::Generic(err.into()))?;
if let InvocationStatus::Invoked(invocation_meta) = invocation_status {
Ok(Some((invocation_id, invocation_meta.invocation_target)))
if let InvocationStatusDiscriminants::Invoked = invocation_status.status {
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_status.invocation_target,
is_invoked: true,
}))
} else if let InvocationStatusDiscriminants::Killed = invocation_status.status {
Ok(Some(InvokedOrKilledInvocationStatusLite {
invocation_id,
invocation_target: invocation_status.invocation_target,
is_invoked: false,
}))
} else {
Ok(None)
}
Expand All @@ -274,9 +287,9 @@ impl ReadOnlyInvocationStatusTable for PartitionStore {
get_invocation_status(self, invocation_id)
}

fn all_invoked_invocations(
fn all_invoked_or_killed_invocations(
&mut self,
) -> impl Stream<Item = Result<(InvocationId, InvocationTarget)>> + Send {
) -> impl Stream<Item = Result<InvokedOrKilledInvocationStatusLite>> + Send {
stream::iter(invoked_invocations(
self,
self.partition_key_range().clone(),
Expand All @@ -300,9 +313,9 @@ impl<'a> ReadOnlyInvocationStatusTable for PartitionStoreTransaction<'a> {
try_migrate_and_get_invocation_status(self, invocation_id)
}

fn all_invoked_invocations(
fn all_invoked_or_killed_invocations(
&mut self,
) -> impl Stream<Item = Result<(InvocationId, InvocationTarget)>> + Send {
) -> impl Stream<Item = Result<InvokedOrKilledInvocationStatusLite>> + Send {
stream::iter(invoked_invocations(
self,
self.partition_key_range().clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use googletest::prelude::*;
use once_cell::sync::Lazy;
use restate_storage_api::invocation_status_table::{
InFlightInvocationMetadata, InvocationStatus, InvocationStatusTable, InvocationStatusV1,
JournalMetadata, ReadOnlyInvocationStatusTable, StatusTimestamps,
InvokedOrKilledInvocationStatusLite, JournalMetadata, ReadOnlyInvocationStatusTable,
StatusTimestamps,
};
use restate_storage_api::Transaction;
use restate_types::identifiers::{InvocationId, PartitionProcessorRpcRequestId, WithPartitionKey};
Expand Down Expand Up @@ -94,6 +95,19 @@ fn invoked_status(invocation_target: InvocationTarget) -> InvocationStatus {
})
}

fn killed_status(invocation_target: InvocationTarget) -> InvocationStatus {
InvocationStatus::Killed(InFlightInvocationMetadata {
invocation_target,
journal_metadata: JournalMetadata::initialize(ServiceInvocationSpanContext::empty()),
pinned_deployment: None,
response_sinks: HashSet::new(),
timestamps: StatusTimestamps::init(MillisSinceEpoch::new(0)),
source: Source::Ingress(*RPC_REQUEST_ID),
completion_retention_duration: Duration::ZERO,
idempotency_key: None,
})
}

fn suspended_status(invocation_target: InvocationTarget) -> InvocationStatus {
InvocationStatus::Suspended {
metadata: InFlightInvocationMetadata {
Expand Down Expand Up @@ -131,7 +145,7 @@ async fn populate_data<T: InvocationStatusTable>(txn: &mut T) {

txn.put_invocation_status(
&INVOCATION_ID_4,
&invoked_status(INVOCATION_TARGET_4.clone()),
&killed_status(INVOCATION_TARGET_4.clone()),
)
.await;

Expand All @@ -154,22 +168,34 @@ async fn verify_point_lookups<T: InvocationStatusTable>(txn: &mut T) {
txn.get_invocation_status(&INVOCATION_ID_4)
.await
.expect("should not fail"),
invoked_status(INVOCATION_TARGET_4.clone())
killed_status(INVOCATION_TARGET_4.clone())
);
}

async fn verify_all_svc_with_status_invoked<T: InvocationStatusTable>(txn: &mut T) {
let actual = txn
.all_invoked_invocations()
.all_invoked_or_killed_invocations()
.try_collect::<Vec<_>>()
.await
.unwrap();
assert_that!(
actual,
unordered_elements_are![
eq((*INVOCATION_ID_1, INVOCATION_TARGET_1.clone())),
eq((*INVOCATION_ID_2, INVOCATION_TARGET_2.clone())),
eq((*INVOCATION_ID_4, INVOCATION_TARGET_4.clone()))
eq(InvokedOrKilledInvocationStatusLite {
invocation_id: *INVOCATION_ID_1,
invocation_target: INVOCATION_TARGET_1.clone(),
is_invoked: true,
}),
eq(InvokedOrKilledInvocationStatusLite {
invocation_id: *INVOCATION_ID_2,
invocation_target: INVOCATION_TARGET_2.clone(),
is_invoked: true,
}),
eq(InvokedOrKilledInvocationStatusLite {
invocation_id: *INVOCATION_ID_4,
invocation_target: INVOCATION_TARGET_4.clone(),
is_invoked: false,
}),
]
);
}
Expand Down
7 changes: 7 additions & 0 deletions crates/storage-api/proto/dev/restate/storage/v1/domain.proto
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ message InvocationStatusV2 {
INBOXED = 2;
INVOKED = 3;
SUSPENDED = 4;
KILLED = 6;
COMPLETED = 5;
}

Expand Down Expand Up @@ -160,6 +161,12 @@ message InvocationStatusV2 {
ResponseResult result = 18;
}

// Slimmer version of InvocationStatusV2
message InvocationV2Lite {
InvocationStatusV2.Status status = 1;
InvocationTarget invocation_target = 2;
}

// TODO remove this after 1.1
message InvocationStatus {

Expand Down
Loading
Loading