Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/reference/feature-servers/python-feature-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,26 @@ requests.post(
"http://localhost:6566/push",
data=json.dumps(push_data))
```
#### Offline write batching for `/push`

The Python feature server supports configurable batching for the **offline**
portion of writes executed via the `/push` endpoint.

Only the offline part of a push is affected:

- `to: "offline"` → **fully batched**
- `to: "online_and_offline"` → **online written immediately**, **offline batched**
- `to: "online"` → unaffected, always immediate

Enable batching in your `feature_store.yaml`:

```yaml
feature_server:
type: local
offline_push_batching_enabled: true
offline_push_batching_batch_size: 1000
offline_push_batching_batch_interval_seconds: 10
```

### Materializing features

Expand Down
16 changes: 16 additions & 0 deletions docs/reference/feature-store-yaml.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,22 @@ online_store:
* **project\_id** — Optional parameter for the datastore online store. Sets the GCP project id used by Feast, if not set Feast will use the default GCP project id in the local environment.
* **project** — Defines a namespace for the entire feature store. Can be used to isolate multiple deployments in a single installation of Feast.

### feature_server

The `feature_server` block configures the Python Feature Server when it is used
to serve online features and handle `/push` requests. This section is optional
and only applies when running the Python feature server.

An example configuration:

```yaml
feature_server:
type: local
offline_push_batching_enabled: true # Enables batching of offline writes processed by /push. Online writes are unaffected.
offline_push_batching_batch_size: 100 # Maximum number of buffered rows before writing to the offline store.
offline_push_batching_batch_interval_seconds: 5 # Maximum time rows may remain buffered before a forced flush.
```

## Providers

The `provider` field defines the environment in which Feast will execute data flows. As a result, it also determines the default values for other fields.
Expand Down
294 changes: 276 additions & 18 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,29 @@
# Copyright 2025 The Feast Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import os
import sys
import threading
import time
import traceback
from collections import defaultdict
from contextlib import asynccontextmanager
from datetime import datetime
from importlib import resources as importlib_resources
from typing import Any, Dict, List, Optional, Union
from types import SimpleNamespace
from typing import Any, DefaultDict, Dict, List, NamedTuple, Optional, Union

import pandas as pd
import psutil
Expand Down Expand Up @@ -195,6 +211,41 @@ def get_app(
registry_proto = None
shutting_down = False
active_timer: Optional[threading.Timer] = None
# --- Offline write batching config and batcher ---
fs_cfg = getattr(store.config, "feature_server", None)
batching_cfg = None
if fs_cfg is not None:
enabled = getattr(fs_cfg, "offline_push_batching_enabled", False)
batch_size = getattr(fs_cfg, "offline_push_batching_batch_size", None)
batch_interval_seconds = getattr(
fs_cfg, "offline_push_batching_batch_interval_seconds", None
)

if enabled is True:
size_ok = isinstance(batch_size, int) and not isinstance(batch_size, bool)
interval_ok = isinstance(batch_interval_seconds, int) and not isinstance(
batch_interval_seconds, bool
)
if size_ok and interval_ok:
batching_cfg = SimpleNamespace(
enabled=True,
batch_size=batch_size,
batch_interval_seconds=batch_interval_seconds,
)
else:
logger.warning(
"Offline write batching enabled but missing or invalid numeric values; "
"disabling batching (batch_size=%r, batch_interval_seconds=%r)",
batch_size,
batch_interval_seconds,
)

offline_batcher: Optional[OfflineWriteBatcher] = None
if batching_cfg is not None and batching_cfg.enabled is True:
offline_batcher = OfflineWriteBatcher(store=store, cfg=batching_cfg)
logger.debug("Offline write batching is ENABLED")
else:
logger.debug("Offline write batching is DISABLED")

def stop_refresh():
nonlocal shutting_down
Expand All @@ -219,9 +270,13 @@ def async_refresh():
async def lifespan(app: FastAPI):
await store.initialize()
async_refresh()
yield
stop_refresh()
await store.close()
try:
yield
finally:
stop_refresh()
if offline_batcher is not None:
offline_batcher.shutdown()
await store.close()

app = FastAPI(lifespan=lifespan)

Expand Down Expand Up @@ -326,22 +381,58 @@ async def push(request: PushFeaturesRequest) -> None:
for feature_view in fvs_with_push_sources:
assert_permissions(resource=feature_view, actions=actions)

push_params = dict(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
transform_on_write=request.transform_on_write,
)
async def _push_with_to(push_to: PushMode) -> None:
"""
Helper for performing a single push operation.

NOTE:
- Feast providers **do not currently support async offline writes**.
- Therefore:
* ONLINE and ONLINE_AND_OFFLINE → may be async, depending on provider.async_supported.online.write
* OFFLINE → always synchronous, but executed via run_in_threadpool when called from HTTP handlers.
- The OfflineWriteBatcher handles offline writes directly in its own background thread, but the offline store writes are currently synchronous only.
"""
push_source_name = request.push_source_name
allow_registry_cache = request.allow_registry_cache
transform_on_write = request.transform_on_write

# Async currently only applies to online store writes (ONLINE / ONLINE_AND_OFFLINE paths) as theres no async for offline store
if push_to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE) and (
store._get_provider().async_supported.online.write
):
await store.push_async(
push_source_name=push_source_name,
df=df,
allow_registry_cache=allow_registry_cache,
to=push_to,
transform_on_write=transform_on_write,
)
else:
await run_in_threadpool(
lambda: store.push(
push_source_name=push_source_name,
df=df,
allow_registry_cache=allow_registry_cache,
to=push_to,
transform_on_write=transform_on_write,
)
)

should_push_async = (
store._get_provider().async_supported.online.write
and to in [PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE]
)
if should_push_async:
await store.push_async(**push_params)
needs_online = to in (PushMode.ONLINE, PushMode.ONLINE_AND_OFFLINE)
needs_offline = to in (PushMode.OFFLINE, PushMode.ONLINE_AND_OFFLINE)

if offline_batcher is None or not needs_offline:
await _push_with_to(to)
else:
store.push(**push_params)
if needs_online:
await _push_with_to(PushMode.ONLINE)

offline_batcher.enqueue(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
transform_on_write=request.transform_on_write,
)

async def _get_feast_object(
feature_view_name: str, allow_registry_cache: bool
Expand Down Expand Up @@ -683,3 +774,170 @@ def start_server(
)
else:
uvicorn.run(app, host=host, port=port, access_log=(not no_access_log))


class _OfflineBatchKey(NamedTuple):
push_source_name: str
allow_registry_cache: bool
transform_on_write: bool


class OfflineWriteBatcher:
"""
In-process offline write batcher for /push requests.

- Buffers DataFrames per (push_source_name, allow_registry_cache, transform_on_write)
- Flushes when either:
* total rows in a buffer >= batch_size, or
* time since last flush >= batch_interval_seconds
- Flush runs in a dedicated background thread so the HTTP event loop stays unblocked.
"""

def __init__(self, store: "feast.FeatureStore", cfg: Any):
self._store = store
self._cfg = cfg

# Buffers and timestamps keyed by batch key
self._buffers: DefaultDict[_OfflineBatchKey, List[pd.DataFrame]] = defaultdict(
list
)
self._last_flush: DefaultDict[_OfflineBatchKey, float] = defaultdict(time.time)

self._lock = threading.Lock()
self._stop_event = threading.Event()

# Start background flusher thread
self._thread = threading.Thread(
target=self._run, name="offline_write_batcher", daemon=True
)
self._thread.start()

logger.debug(
"OfflineWriteBatcher initialized: batch_size=%s, batch_interval_seconds=%s",
getattr(cfg, "batch_size", None),
getattr(cfg, "batch_interval_seconds", None),
)

# ---------- Public API ----------

def enqueue(
self,
push_source_name: str,
df: pd.DataFrame,
allow_registry_cache: bool,
transform_on_write: bool,
) -> None:
"""
Enqueue a dataframe for offline write, grouped by push source + flags.
Cheap and non-blocking; heavy I/O happens in background thread.
"""
key = _OfflineBatchKey(
push_source_name=push_source_name,
allow_registry_cache=allow_registry_cache,
transform_on_write=transform_on_write,
)

with self._lock:
self._buffers[key].append(df)
total_rows = sum(len(d) for d in self._buffers[key])

# Size-based flush
if total_rows >= self._cfg.batch_size:
logger.debug(
"OfflineWriteBatcher size threshold reached for %s: %s rows",
key,
total_rows,
)
self._flush_locked(key)

def flush_all(self) -> None:
"""
Flush all buffers synchronously. Intended for graceful shutdown.
"""
with self._lock:
keys = list(self._buffers.keys())
for key in keys:
self._flush_locked(key)

def shutdown(self, timeout: float = 5.0) -> None:
"""
Stop the background thread and perform a best-effort flush.
"""
logger.debug("Shutting down OfflineWriteBatcher")
self._stop_event.set()
try:
self._thread.join(timeout=timeout)
except Exception:
logger.exception("Error joining OfflineWriteBatcher thread")

# Best-effort final flush
try:
self.flush_all()
except Exception:
logger.exception("Error during final OfflineWriteBatcher flush")

# ---------- Internal helpers ----------

def _run(self) -> None:
"""
Background loop: periodically checks for buffers that should be flushed
based on time since last flush.
"""
interval = max(1, int(getattr(self._cfg, "batch_interval_seconds", 30)))
logger.debug(
"OfflineWriteBatcher background loop started with check interval=%s",
interval,
)

while not self._stop_event.wait(timeout=interval):
now = time.time()
try:
with self._lock:
for key, dfs in list(self._buffers.items()):
if not dfs:
continue
last = self._last_flush[
key
] # this will also init the default timestamp
age = now - last
if age >= self._cfg.batch_interval_seconds:
logger.debug(
"OfflineWriteBatcher time threshold reached for %s: age=%s",
key,
age,
)
self._flush_locked(key)
except Exception:
logger.exception("Error in OfflineWriteBatcher background loop")

logger.debug("OfflineWriteBatcher background loop exiting")

def _flush_locked(self, key: _OfflineBatchKey) -> None:
"""
Flush a single buffer; caller must hold self._lock.
"""
dfs = self._buffers.get(key)
if not dfs:
return

batch_df = pd.concat(dfs, ignore_index=True)
self._buffers[key].clear()
self._last_flush[key] = time.time()

logger.debug(
"Flushing offline batch for push_source=%s with %s rows",
key.push_source_name,
len(batch_df),
)

# NOTE: offline writes are currently synchronous only, so we call directly
try:
self._store.push(
push_source_name=key.push_source_name,
df=batch_df,
allow_registry_cache=key.allow_registry_cache,
to=PushMode.OFFLINE,
transform_on_write=key.transform_on_write,
)
except Exception:
logger.exception("Error flushing offline batch for %s", key)
Loading
Loading