Skip to content

Commit

Permalink
implement batching strategies
Browse files Browse the repository at this point in the history
  • Loading branch information
sauyon committed Mar 16, 2023
1 parent ca5edb7 commit c11fb81
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 23 deletions.
11 changes: 10 additions & 1 deletion src/bentoml/_internal/configuration/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,16 @@ def __init__(
) from None

def _finalize(self):
RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"]
RUNNER_CFG_KEYS = [
"batching",
"resources",
"logging",
"metrics",
"timeout",
"strategy",
"strategy_options",
]

global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS}
custom_runners_cfg = dict(
filter(
Expand Down
2 changes: 2 additions & 0 deletions src/bentoml/_internal/configuration/v1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@
s.Optional("enabled"): bool,
s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero),
s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero),
s.Optional("strategy"): str,
s.Optional("strategy_options"): dict,
},
# NOTE: there is a distinction between being unset and None here; if set to 'None'
# in configuration for a specific runner, it will override the global configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,13 @@ runners:
batching:
enabled: true
max_batch_size: 100
# which strategy to use to batch requests
# there are currently two available options:
# - fixed_wait: always wait a fixed time after a request arrives
# - average_wait: intelligently wait a time based on
strategy: intelligent_wait
strategy_options:
decay: 0.95
max_latency_ms: 10000
logging:
access:
Expand Down
118 changes: 98 additions & 20 deletions src/bentoml/_internal/marshal/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import functools
import traceback
import collections
from abc import ABC
from abc import abstractmethod

import numpy as np

Expand Down Expand Up @@ -41,7 +43,7 @@ class Job:

class Optimizer:
"""
Analyse historical data to optimize CorkDispatcher.
Analyze historical data to predict execution time using a simple linear regression on batch size.
"""

N_KEPT_SAMPLE = 50 # amount of outbound info kept for inferring params
Expand Down Expand Up @@ -98,14 +100,97 @@ def trigger_refresh(self):
T_OUT = t.TypeVar("T_OUT")


class CorkDispatcher:
"""
A decorator that:
* wrap batch function
* implement CORK algorithm to cork & release calling of wrapped function
The wrapped function should be an async function.
"""
BATCHING_STRATEGY_REGISTRY = {}


class BatchingStrategy(abc.ABC):
strategy_id: str

@abc.abstractmethod
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]):
pass

def __init_subclass__(cls, strategy_id: str):
BATCHING_STRATEGY_REGISTRY[strategy_id] = cls
cls.strategy_id = strategy_id


class TargetLatencyStrategy(strategy_id="target_latency"):
latency: float = 1

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "latency":
self.latency = options[key] / 1000.0
else:
logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

while latency_0 < self.latency:
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

await asyncio.sleep(tick_interval)


class FixedWaitStrategy(strategy_id="fixed_wait"):
wait: float = 1

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "wait":
self.wait = options[key] / 1000.0
else:
logger.warning("Strategy 'fixed_wait' ignoring unknown configuration key '{key}'")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
now = time.time()
w0 = now - queue[0].enqueue_time

if w0 < self.wait:
await asyncio.sleep(self.wait - w0)


class IntelligentWaitStrategy(strategy_id="intelligent_wait"):
decay: float = 0.95

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "decay":
self.decay = options[key]
else:
logger.warning("Strategy 'intelligent_wait' ignoring unknown configuration value")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
n = len(queue)
now = time.time()
wn = now - queue[-1].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b
while (
# if we don't already have enough requests,
n < max_batch_size
# we are not about to cancel the first request,
and latency_0 + dt <= self.max_latency * 0.95
# and waiting will cause average latency to decrese
and n * (wn + dt + optimizer.o_a) <= optimizer.wait * decay
):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

# wait for additional requests to arrive
await asyncio.sleep(tick_interval)



class Dispatcher:
def __init__(
self,
max_latency_in_ms: int,
Expand Down Expand Up @@ -283,19 +368,11 @@ async def controller(self):
continue
await asyncio.sleep(self.tick_interval)
continue
if (
n < self.max_batch_size
and n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay
):
n = len(self._queue)
now = time.time()
wn = now - self._queue[-1].enqueue_time
latency_0 += dt

# wait for additional requests to arrive
await asyncio.sleep(self.tick_interval)
continue

# we are now free to dispatch whenever we like
await self.strategy.wait(self._queue, optimizer, self.max_latency, self.max_batch_size, self.tick_interval)

n = len(self._queue)
n_call_out = min(self.max_batch_size, n)
# call
self._sema.acquire()
Expand All @@ -306,6 +383,7 @@ async def controller(self):
except Exception as e: # pylint: disable=broad-except
logger.error(traceback.format_exc(), exc_info=e)


async def inbound_call(self, data: t.Any):
now = time.time()
future = self._loop.create_future()
Expand Down
2 changes: 2 additions & 0 deletions src/bentoml/_internal/models/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def to_runner(
name: str = "",
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
batching_strategy: BatchingStrategy | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
) -> Runner:
"""
Expand All @@ -348,6 +349,7 @@ def to_runner(
models=[self],
max_batch_size=max_batch_size,
max_latency_ms=max_latency_ms,
batching_strategy=batching_strategy,
method_configs=method_configs,
)

Expand Down
32 changes: 30 additions & 2 deletions src/bentoml/_internal/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from abc import ABC
from abc import abstractmethod
from pprint import pprint

import attr
from simple_di import inject
Expand All @@ -19,6 +20,7 @@
from .runner_handle import RunnerHandle
from .runner_handle import DummyRunnerHandle
from ..configuration.containers import BentoMLContainer
from ..marshal.dispatcher import BATCHING_STRATEGY_REGISTRY

if t.TYPE_CHECKING:
from ...triton import Runner as TritonRunner
Expand Down Expand Up @@ -47,6 +49,7 @@ class RunnerMethod(t.Generic[T, P, R]):
config: RunnableMethodConfig
max_batch_size: int
max_latency_ms: int
batching_strategy: BatchingStrategy

def run(self, *args: P.args, **kwargs: P.kwargs) -> R:
return self.runner._runner_handle.run_method(self, *args, **kwargs)
Expand Down Expand Up @@ -159,6 +162,7 @@ def __init__(
models: list[Model] | None = None,
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
batching_strategy: BatchingStrategy | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
) -> None:
"""
Expand All @@ -177,8 +181,9 @@ def __init__(
models: An optional list composed of ``bentoml.Model`` instances.
max_batch_size: Max batch size config for dynamic batching. If not provided, use the default value from
configuration.
max_latency_ms: Max latency config for dynamic batching. If not provided, use the default value from
configuration.
max_latency_ms: Max latency config. If not provided, uses the default value from configuration.
batching_strategy: Batching strategy for dynamic batching. If not provided, uses the default value
from the configuration.
method_configs: A dictionary per method config for this given Runner signatures.
Returns:
Expand Down Expand Up @@ -215,13 +220,31 @@ def __init__(

method_max_batch_size = None
method_max_latency_ms = None
method_batching_strategy = None
if method_name in method_configs:
method_max_batch_size = method_configs[method_name].get(
"max_batch_size"
)
method_max_latency_ms = method_configs[method_name].get(
"max_latency_ms"
)
method_batching_strategy = method_configs[method_name].get(
"batching_strategy"
)

if config["batching"]["strategy"] not in BATCHING_STRATEGY_REGISTRY:
raise BentoMLConfigException(
f"Unknown batching strategy '{config['batching']['strategy']}'. Available strategies are: {','.join(BATCHING_STRATEGY_REGISTRY.keys())}.",
)

try:
default_batching_strategy = BATCHING_STRATEGY_REGISTRY[
config["batching"]["strategy"]
](**config["batching"]["strategy_options"])
except Exception as e:
raise BentoMLConfigException(
f"Initializing strategy '{config['batching']['strategy']}' with configured options ({pprint(config['batching']['strategy_options'])}) failed."
) from e

runner_method_map[method_name] = RunnerMethod(
runner=self,
Expand All @@ -237,6 +260,11 @@ def __init__(
max_latency_ms,
default=config["batching"]["max_latency_ms"],
),
batching_strategy=first_not_none(
method_batching_strategy,
batching_strategy,
default=BATCHING_STRATEGY_REGISTRY[config["batching"]["strategy"]],
),
)

self.__attrs_init__(
Expand Down
1 change: 1 addition & 0 deletions src/bentoml/_internal/server/runner_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
max_batch_size = method.max_batch_size if method.config.batchable else 1
self.dispatchers[method.name] = CorkDispatcher(
max_latency_in_ms=method.max_latency_ms,
batching_strategy=method.batching_strategy,
max_batch_size=max_batch_size,
fallback=functools.partial(
ServiceUnavailable, message="process is overloaded"
Expand Down

0 comments on commit c11fb81

Please sign in to comment.