-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce segmented queue in the invoker #482
Conversation
src/queue/src/segmented_queue.rs
Outdated
@@ -97,10 +125,15 @@ impl<T: Serialize + DeserializeOwned + Send + 'static> SegmentQueue<T> { | |||
} | |||
} | |||
|
|||
pub async fn dequeue(&mut self) -> Option<T> { | |||
self.wait_for_non_emptiness().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand the code correctly, then this call will prevent us from entering a busy loop where the outer select loop will always complete on dequeue
if the queue is empty. Is this correct? If yes, then I am wondering whether this shouldn't rather be a concern of the caller (the outer select loop introducing a guard for the dequeue
branch (if None
was returned, then disable, when enqueue something then enabling this branch)).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have understood correctly @tillrohrmann. I didn't know that it is possible to disable branches.
But with this approach, we have now two variants try_dequeue
and dequeue
where the former will complete right away, and the later will be awaiting.
Seems useful to have, and it is convenient and cheap for the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If yes, then I am wondering whether this shouldn't rather be a concern of the caller
I don't agree with this. To me the changes introduced in this PR make the interface look more like a channel interface, and this is IMO more clear and idiomatic, in particular given that the interface has async
in the dequeue
signature. Overall, this reads nicer.
Also the fact that this future, in the location where we use it, depends on what the outer loop is just a detail of how the SegmentedQueue
works now and in future, hopefully, this will look more like a real channel (as @igalshilman and I discussed), where the sender part is owned directly by partition processors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% certain, but I think that the NonEmptyFuture
does not fully work as intended. I think that the registration of the waker does not have any effect and it would be enough to return Poll::Pending
if queue.is_empty()
. The reason is that the returned future from this method captures &mut self
. Hence, in order to call enqueue
one will have to drop this future and thereby rendering the waker useless.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm I think you're right, in another scenario this most likely won't compile because of the double borrowing of &mut self
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small follow-up question: If dequeue
waits until the queue is non-empty, then I assume that we don't have to return a Option<T>
but can return a T
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tillrohrmann you right, there is no need to capture the waker in this case. Other then this, I'd leave this as-is,
And will also leave any followup queue design discussions outside of this scope :-) that might or might not happen in the future (no pun indented)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then I assume that we don't have to return a Option but can return a T.
Makes sense, channels use Option<>
because they can be closed, but that's a concept this queue doesn't have.
So what you guys think I should do here? Just rollback the last two commits and we use the queue in a busy loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a guard on the select arm should prevent the busy loop, I guess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok so I'm gonna keep igal's suggested changes for the time being, and just modify the usage here in the invoker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @slinkydeveloper, from my side the changes looks good!
After Till's comment is resolved either way, +1 to merge.
src/invoker/src/invoker.rs
Outdated
PollNext::Right | ||
}); | ||
// Prepare the segmented queue | ||
let mut segmented_input_queue = restate_queue::SegmentQueue::init(tmp_dir, 1_056_784) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this value come from the configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you set it up, it does, otherwise defaults to a temp dir.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I should have been more specific. I mean the queue size.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, sure we can do that if we want to, although I'm not sure this is a knob someone will ever touch.
1e79055
to
7122d3a
Compare
84de828
to
e332214
Compare
Also fix typo.
e332214
to
7ceeed4
Compare
Tested with 1 in_memory_threshold and it works :) @tillrohrmann perhaps do a last pass and we can merge it. |
7ceeed4
to
c0ddb64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @slinkydeveloper. LGTM. +1 for merging.
/// # Temporary directory | ||
/// | ||
/// Temporary directory to use for the invoker temporary files. | ||
/// If empty, the system temporary directory will be used instead. | ||
#[cfg_attr( | ||
feature = "options_schema", | ||
schemars(with = "String", default = "Options::default_tmp_dir") | ||
)] | ||
tmp_dir: PathBuf, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At some point we should create by default a Restate working directory under which we assemble all the different directories so that people are aware of what Restate writes to disk. Not for now, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PollNext::Right | ||
}); | ||
// Prepare the segmented queue | ||
let mut segmented_input_queue = SegmentQueue::init(tmp_dir, 1_056_784) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed right now but we might wanna the in memory threshold configurable in order to control Restate's memory footprint a tad bit better. At the moment we still have the problem that invocations can have arbitrary size, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the time being, let's keep this hardcoded value. I feel this config option will probably remain forever untouched, so perhaps let's make it configurable the day someone really finds out this queue to be problematic to avoid polluting now our configs of very complicated options to understand.
Fix #285