Skip to content

Commit

Permalink
Refactor invoker following the rust conventions (#528)
Browse files Browse the repository at this point in the history
* Moved options out of invoker.rs
* Rename OutputEffect -> Effect, Kind -> EffectKind
* Split the invoker module
* Rename Invoker to Service
* Split the various "parts" of the invoker interface from lib to specific mods
* Introduced interface for InvokerStatusReader
* Simplified the injection of Invoker state through the state machine coordinator.
  • Loading branch information
slinkydeveloper committed Jun 26, 2023
1 parent 001aca9 commit fd1abf3
Show file tree
Hide file tree
Showing 21 changed files with 1,945 additions and 1,975 deletions.
62 changes: 31 additions & 31 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions src/invoker/src/effects.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use restate_common::errors::InvocationError;
use restate_common::types::{EnrichedRawEntry, EntryIndex, ServiceInvocationId};
use std::collections::HashSet;

#[derive(Debug)]
pub struct Effect {
pub service_invocation_id: ServiceInvocationId,
pub kind: EffectKind,
}

#[derive(Debug)]
pub enum EffectKind {
JournalEntry {
entry_index: EntryIndex,
entry: EnrichedRawEntry,
},
Suspended {
waiting_for_completed_entries: HashSet<EntryIndex>,
},
/// This is sent always after [`Self::JournalEntry`] with `OutputStreamEntry`(s).
End,
/// This is sent when the invoker exhausted all its attempts to make progress on the specific invocation.
Failed(InvocationError),
}
67 changes: 67 additions & 0 deletions src/invoker/src/handle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use super::Effect;

use restate_common::types::{
EntryIndex, JournalMetadata, PartitionLeaderEpoch, ServiceInvocationId,
};
use restate_journal::raw::PlainRawEntry;
use restate_journal::Completion;
use std::future::Future;
use tokio::sync::mpsc;

#[derive(Debug, Default)]
pub enum InvokeInputJournal {
#[default]
NoCachedJournal,
CachedJournal(JournalMetadata, Vec<PlainRawEntry>),
}

// TODO move this to restate_errors, we have several copies of this type (e.g. NetworkNotRunning)
#[derive(Debug, thiserror::Error)]
#[error("invoker is not running")]
pub struct ServiceNotRunning;

pub trait ServiceHandle {
type Future: Future<Output = Result<(), ServiceNotRunning>>;

fn invoke(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
journal: InvokeInputJournal,
) -> Self::Future;

fn resume(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
journal: InvokeInputJournal,
) -> Self::Future;

fn notify_completion(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
completion: Completion,
) -> Self::Future;

fn notify_stored_entry_ack(
&mut self,
partition: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
entry_index: EntryIndex,
) -> Self::Future;

fn abort_all_partition(&mut self, partition: PartitionLeaderEpoch) -> Self::Future;

fn abort_invocation(
&mut self,
partition_leader_epoch: PartitionLeaderEpoch,
service_invocation_id: ServiceInvocationId,
) -> Self::Future;

fn register_partition(
&mut self,
partition: PartitionLeaderEpoch,
sender: mpsc::Sender<Effect>,
) -> Self::Future;
}
Loading

0 comments on commit fd1abf3

Please sign in to comment.