|
18 | 18 | import signal
|
19 | 19 | import os
|
20 | 20 |
|
21 |
| -from fastapi import APIRouter, Depends, Request, Response, status |
| 21 | +from fastapi import APIRouter, Request, Depends, WebSocket, WebSocketDisconnect, status |
| 22 | +from fastapi.responses import StreamingResponse |
22 | 23 | from fastapi.encoders import jsonable_encoder
|
23 | 24 | from fastapi.routing import Mount
|
| 25 | +from websockets.exceptions import ConnectionClosed, WebSocketException |
| 26 | + |
24 | 27 | from typing import List
|
25 | 28 |
|
26 | 29 | from gns3server.config import Config
|
|
29 | 32 | from gns3server.controller.controller_error import ControllerError, ControllerForbiddenError
|
30 | 33 | from gns3server import schemas
|
31 | 34 |
|
32 |
| -from .dependencies.authentication import get_current_active_user |
| 35 | +from .dependencies.authentication import get_current_active_user, get_current_active_user_from_websocket |
33 | 36 |
|
34 | 37 | import logging
|
35 | 38 |
|
@@ -174,6 +177,57 @@ async def statistics() -> List[dict]:
|
174 | 177 | return compute_statistics
|
175 | 178 |
|
176 | 179 |
|
| 180 | +@router.get("/notifications", dependencies=[Depends(get_current_active_user)]) |
| 181 | +async def controller_http_notifications(request: Request) -> StreamingResponse: |
| 182 | + """ |
| 183 | + Receive controller notifications about the controller from HTTP stream. |
| 184 | + """ |
| 185 | + |
| 186 | + from gns3server.api.server import app |
| 187 | + log.info(f"New client {request.client.host}:{request.client.port} has connected to controller HTTP " |
| 188 | + f"notification stream") |
| 189 | + |
| 190 | + async def event_stream(): |
| 191 | + try: |
| 192 | + with Controller.instance().notification.controller_queue() as queue: |
| 193 | + while not app.state.exiting: |
| 194 | + msg = await queue.get_json(5) |
| 195 | + yield f"{msg}\n".encode("utf-8") |
| 196 | + finally: |
| 197 | + log.info(f"Client {request.client.host}:{request.client.port} has disconnected from controller HTTP " |
| 198 | + f"notification stream") |
| 199 | + return StreamingResponse(event_stream(), media_type="application/json") |
| 200 | + |
| 201 | + |
| 202 | +@router.websocket("/notifications/ws") |
| 203 | +async def controller_ws_notifications( |
| 204 | + websocket: WebSocket, |
| 205 | + current_user: schemas.User = Depends(get_current_active_user_from_websocket) |
| 206 | +) -> None: |
| 207 | + """ |
| 208 | + Receive project notifications about the controller from WebSocket. |
| 209 | + """ |
| 210 | + |
| 211 | + if current_user is None: |
| 212 | + return |
| 213 | + |
| 214 | + log.info(f"New client {websocket.client.host}:{websocket.client.port} has connected to controller WebSocket") |
| 215 | + try: |
| 216 | + with Controller.instance().notification.controller_queue() as queue: |
| 217 | + while True: |
| 218 | + notification = await queue.get_json(5) |
| 219 | + await websocket.send_text(notification) |
| 220 | + except (ConnectionClosed, WebSocketDisconnect): |
| 221 | + log.info(f"Client {websocket.client.host}:{websocket.client.port} has disconnected from controller WebSocket") |
| 222 | + except WebSocketException as e: |
| 223 | + log.warning(f"Error while sending to controller event to WebSocket client: {e}") |
| 224 | + finally: |
| 225 | + try: |
| 226 | + await websocket.close() |
| 227 | + except OSError: |
| 228 | + pass # ignore OSError: [Errno 107] Transport endpoint is not connected |
| 229 | + |
| 230 | + |
177 | 231 | # @Route.post(
|
178 | 232 | # r"/debug",
|
179 | 233 | # description="Dump debug information to disk (debug directory in config directory). Work only for local server",
|
|
0 commit comments