-
Notifications
You must be signed in to change notification settings - Fork 791
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement batching strategies #3630
base: main
Are you sure you want to change the base?
Conversation
Codecov Report
@@ Coverage Diff @@
## main #3630 +/- ##
==========================================
+ Coverage 0.00% 31.85% +31.85%
==========================================
Files 166 146 -20
Lines 15286 12038 -3248
Branches 0 1989 +1989
==========================================
+ Hits 0 3835 +3835
+ Misses 15286 7928 -7358
- Partials 0 275 +275
|
Oh, I'd forgotten about ruff. Man, it checks fast 😅 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM in general, just added a note of documentation improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
This one is waiting on me to change some naming around, need to get to that. |
now = time.time() | ||
w0 = now - queue[0].enqueue_time | ||
|
||
if w0 < self.wait: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add loop checking for max_batch_size
here.
# we are now free to dispatch whenever we like | ||
await self.strategy.wait(self._queue, optimizer, self.max_latency, self.max_batch_size, self.tick_interval) | ||
|
||
n = len(self._queue) | ||
n_call_out = min(self.max_batch_size, n) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this (and above) logic into strategy.
Had some discussion about this PR with Sauyon. These are decisions:
|
# we are not about to cancel the first request, | ||
and latency_0 + dt <= self.max_latency * 0.95 | ||
# and waiting will cause average latency to decrese | ||
and n * (wn + dt + optimizer.o_a) <= optimizer.wait * decay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
n: number of requests in queue
multiplied by
(
wn: predictor of next request time +
dt: tick time +
o_a: optimizer slope
)
^ The above is a measure of how much latency will be added to every request if we wait for a new request and add that to the batch
less than
optimizer.wait: the average amount of time a request sits in queue
*
decay: an arbitrary decay value so that average wait should hopefully decay over time
…ml#3663)" (bentoml#3680)" This reverts commit bcc10ac.
@bojiang this should be ok to look at for now, broad strokes. |
class TargetLatencyStrategy(BatchingStrategy, strategy_id="target_latency"): | ||
latency: float = 1. | ||
|
||
def __init__(self, options: dict[t.Any, t.Any]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: typed dict for init.
strategy_id: str | ||
|
||
@abc.abstractmethod | ||
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]): | |
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[[t.Sequence[Job]], t.Any], dispatch: t.Callable[..., t.Any]): |
the way that the scheduler chooses a batching window, i.e. the time it waits for requests to combine | ||
them into a batch before dispatching it to begin execution. There are three options: | ||
|
||
- target_latency: this strategy waits until it expects the first request received will take around |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strategy: adaptive | ||
strategy_options: | ||
decay: 0.95 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe the same pattern as the optimizer
?
strategy:
name: adaptive
options:
decay: 0.95
Batching Strategy | ||
^^^^^^^^^^^^^^^^^ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docs changes are outdated, correct?
fallback=functools.partial( | ||
ServiceUnavailable, message="process is overloaded" | ||
ServiceUnavailable, message="runner process is overloaded" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's give more info about which runner and index.
I think this should be ready for review now if anybody wants to take a look (@bojiang I implemented wait time). Once I add some tests I'll probably factor this into separate commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 more thing is to update documentation, I have read through the marshal refactor and LGTM
status: We probably want a load test before merging this one in. |
Is this likely to be reviewed and merged? |
Hello @sauyon! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:
Comment last updated at 2023-09-20 02:31:21 UTC |
For more information, see https://pre-commit.ci
This adds a new configuration value,
runner.batching.target_latency_ms
, which controls how long the dispatcher will wait before beginning to execute requests.Could probably do with a little bit of testing to see how setting it to
0
performs vs leaving as~
, but for now adding more knobs users can tweak is probably a good thing; I suspect there will be at least a few people who want the behavior of infinite max latency but not long wait times for requests after a burst.EDIT: This PR has now been updated to provide a
strategy
option in the configuration, which allows a user to define which strategy they would like to use./cc @timliubentoml