Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions fastdeploy/splitwise/internal_adapter_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
# **Note**: Just for internal use
import zmq

from fastdeploy.inter_communicator import ZmqTcpServer
from fastdeploy.engine.request import RequestStatus
from fastdeploy.inter_communicator import ZmqIpcClient, ZmqTcpServer
from fastdeploy.metrics.metrics import get_filtered_metrics
from fastdeploy.utils import envs, get_logger

Expand All @@ -45,6 +46,25 @@ def __init__(self, cfg, engine, dp_rank):
target=self._response_external_module_control_instruct, daemon=True
)
self.response_external_instruct_thread.start()
self._create_abort_request_client()

def _create_abort_request_client(self):
self.abort_client = ZmqIpcClient(envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT, zmq.PUSH)
self.abort_client.connect()
logger.debug(f"Abort request client connected to port {envs.FD_ZMQ_RECV_REQUEST_SERVER_PORT}")

def _send_abort_request(self, req_id: str) -> bool:
try:
data = {
"request_id": req_id,
"status": RequestStatus.ABORT.value,
}
self.abort_client.send_json(data)
logger.debug(f"Sent abort request for req_id: {req_id}")
return True
except Exception as e:
logger.error(f"Failed to send abort request for req_id {req_id}: {e}")
return False

def _get_current_server_info(self):
"""
Expand Down Expand Up @@ -104,7 +124,21 @@ def _recv_external_module_control_instruct(self):
logger.debug(f"Response for task: {task_id_str}: is_health {is_health}")
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)

elif task["cmd"] == "interrupt_requests":
req_ids = task.get("req_ids", [])
interrupted_req_ids = []
for req_id in req_ids:
if self._send_abort_request(req_id):
interrupted_req_ids.append(req_id)
result = {
"task_id": task_id_str,
"result": {
"success": len(req_ids) == 0 or len(interrupted_req_ids) == len(req_ids),
"interrupted_req_ids": interrupted_req_ids,
},
}
with self.response_lock:
self.recv_control_cmd_server.response_for_control_cmd(task_id_str, result)
except Exception as e:
logger.error(f"handle_control_cmd got error: {e}, {traceback.format_exc()!s}")

Expand Down
Loading