Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions fastdeploy/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import zmq

from fastdeploy.engine.args_utils import EngineArgs
from fastdeploy.engine.common_engine import EngineService
from fastdeploy.engine.engine_service_factory import create_engine_service
from fastdeploy.engine.request import Request, RequestOutput
from fastdeploy.engine.sampling_params import SamplingParams
from fastdeploy.entrypoints.openai.utils import DealerConnectionManager
Expand Down Expand Up @@ -141,7 +141,7 @@ def signal_handler(signum, frame):
signal.signal(signal.SIGINT, signal_handler)

try:
engine = EngineService(self.cfg, use_async_llm=True)
engine = create_engine_service(self.cfg, use_async_llm=True)
# Start engine with ZMQ service
engine.start(async_llm_pid=self.engine_pid)

Expand Down
50 changes: 50 additions & 0 deletions fastdeploy/engine/common_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,23 @@
from fastdeploy.trace.trace_logger import print as trace_print
from fastdeploy.utils import EngineError, console_logger, envs, get_logger, llm_logger

# Import I/O capture module for testing and verification
try:
from fastdeploy.engine.io_capture import (
IOTypes,
enable_capture,
get_global_capture,
is_capture_enabled,
)

llm_logger.info("I/O capture module loaded")
except ImportError:
# Fallback for when module is not available
enable_capture = lambda *args, **kwargs: None
is_capture_enabled = lambda: False
get_global_capture = lambda: None
IOTypes = None

try:
TokenProcessor = load_token_processor_plugins()
llm_logger.info(f"TokenProcessor plugin {TokenProcessor} loaded")
Expand Down Expand Up @@ -183,6 +200,17 @@ def __init__(self, cfg, start_queue=True, use_async_llm=False):
self.ipc_signal_suffix = None
self.cache_manager_processes = None

# Initialize I/O capture if enabled via environment variable
if envs.FD_ENABLE_ENGINE_IO_CAPTURE == "1":
output_dir = getattr(envs, "FD_ENGINE_IO_CAPTURE_DIR", "./captured_io")
if self.cfg.parallel_config.data_parallel_size > 1:
output_dir = f"{output_dir}/dp{self.cfg.parallel_config.local_data_parallel_id}"
enable_capture(output_dir)
capture = get_global_capture()
capture.set_config(self.cfg)
capture.save_config_snapshot()
self.llm_logger.info(f"Engine I/O capture enabled, output dir: {output_dir}")

self._finalizer = weakref.finalize(self, self._exit_sub_services)

def start(self, async_llm_pid=None):
Expand Down Expand Up @@ -525,6 +553,12 @@ def insert_tasks(self, tasks: List[Request], current_id=-1):
self.update_requests_chunk_size(tasks)
else:
self.update_mm_requests_chunk_size(tasks)

# Capture tasks sent to worker if I/O capture is enabled
if is_capture_enabled():
capture = get_global_capture()
capture.capture_worker_task(tasks, self.resource_manager.real_bsz)

self.engine_worker_queue.put_tasks((tasks, self.resource_manager.real_bsz))
return True

Expand Down Expand Up @@ -764,6 +798,11 @@ def _schedule_request_to_worker(self):
max_num_batched_tokens=self.cfg.scheduler_config.max_num_batched_tokens,
batch=num_prefill_batch,
)
# Capture tasks from scheduler if I/O capture is enabled
if is_capture_enabled():
capture = get_global_capture()
capture.capture_schedule_task(tasks, current_id)

tasks = [task for task in tasks if task.request_id not in self.resource_manager.abort_req_ids_set]
for task in tasks:
task.metrics.engine_get_req_time = time.time()
Expand Down Expand Up @@ -2042,6 +2081,17 @@ def _start_worker_service(self):
pd_cmd = pd_cmd + f" --ips {ips} --nnodes {len(self.cfg.ips)}"
pd_cmd = pd_cmd + arguments + f" 2>{log_dir}/launch_worker.log"
self.llm_logger.info(f"Launch worker service command: {pd_cmd}")

# Capture launch parameters for verification if enabled
try:
from fastdeploy.engine.components.launch_param_capture import (
capture_if_enabled,
)

capture_if_enabled(pd_cmd, "old", log_dir)
except ImportError:
pass

p = subprocess.Popen(
pd_cmd,
stdout=subprocess.PIPE,
Expand Down
29 changes: 29 additions & 0 deletions fastdeploy/engine/components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Engine components for the new modular architecture.
"""

from fastdeploy.engine.components.ipc_manager import IPCManager
from fastdeploy.engine.components.process_manager import ProcessManager
from fastdeploy.engine.components.resource_coordinator import ResourceCoordinator
from fastdeploy.engine.components.scheduler_coordinator import SchedulerCoordinator

__all__ = [
"IPCManager",
"ProcessManager",
"ResourceCoordinator",
"SchedulerCoordinator",
]
Loading
Loading