Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion bt_servant_engine/apps/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fastapi import FastAPI

from bt_servant_engine.apps.api.middleware import CorrelationIdMiddleware
from bt_servant_engine.apps.api.routes import admin, health, webhooks
from bt_servant_engine.apps.api.routes import admin, admin_logs, health, webhooks
from bt_servant_engine.core.logging import get_logger
from bt_servant_engine.services.brain_orchestrator import create_brain
from bt_servant_engine.services import ServiceContainer
Expand Down Expand Up @@ -50,6 +50,7 @@ def create_app(services: ServiceContainer | None = None) -> FastAPI:
app.add_middleware(CorrelationIdMiddleware)

app.include_router(health.router)
app.include_router(admin_logs.router)
app.include_router(admin.router)
app.include_router(webhooks.router)
return app
Expand Down
229 changes: 229 additions & 0 deletions bt_servant_engine/apps/api/routes/admin_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
"""Administrative routes for serving BT-Servant log files."""

from __future__ import annotations

import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Iterator, TypedDict, cast

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse

from bt_servant_engine.apps.api.dependencies import require_admin_token
from bt_servant_engine.core.logging import LOGS_DIR

router = APIRouter()

LOG_SUFFIX = ".log"
LOG_CHUNK_SIZE = 64 * 1024
RECENT_DAYS_MIN = 1
RECENT_DAYS_MAX = 90
RECENT_LIMIT_MIN = 1
RECENT_LIMIT_MAX = 500


class LogFileEntry(TypedDict):
"""Serialized metadata for a single log file."""

name: str
size_bytes: int
modified_at: str
created_at: str


class LogFilesPayload(TypedDict):
"""Response payload returned by log listing endpoints."""

files: list[LogFileEntry]
total_files: int
total_size_bytes: int


def _isoformat_utc(timestamp: float) -> str:
"""Encode a POSIX timestamp as an ISO-8601 UTC string."""
return datetime.fromtimestamp(timestamp, tz=timezone.utc).isoformat().replace("+00:00", "Z")


def _iter_log_files() -> Iterator[Path]:
"""Yield .log files located in the configured logs directory."""
try:
yield from (
entry for entry in LOGS_DIR.iterdir() if entry.is_file() and entry.suffix == LOG_SUFFIX
)
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log directory not found"
) from exc
except PermissionError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log directory not accessible"
) from exc
except OSError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Unable to read log directory",
) from exc


def _build_log_entry(file_path: Path, stat_result: os.stat_result) -> LogFileEntry:
"""Return serialized metadata for a log file."""
return cast(
LogFileEntry,
{
"name": file_path.name,
"size_bytes": stat_result.st_size,
"modified_at": _isoformat_utc(stat_result.st_mtime),
"created_at": _isoformat_utc(stat_result.st_ctime),
},
)


def _validated_log_path(filename: str) -> Path:
"""Validate the requested filename and return its resolved Path."""
if not filename or filename.endswith("/") or filename.endswith("\\"):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid log filename")
if any(sep in filename for sep in ("/", "\\")) or ".." in filename:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid log filename")
if not filename.endswith(LOG_SUFFIX):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid log filename")

candidate = LOGS_DIR / filename
try:
candidate_resolved = candidate.resolve()
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log file not found"
) from exc
except PermissionError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log file not accessible"
) from exc

if candidate_resolved.parent != LOGS_DIR.resolve():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid log filename")
if not candidate_resolved.exists() or not candidate_resolved.is_file():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Log file not found")
return candidate_resolved


def _stream_file(path: Path) -> Iterator[bytes]:
"""Yield file contents in fixed-size chunks."""
with path.open("rb") as handle:
while True:
chunk = handle.read(LOG_CHUNK_SIZE)
if not chunk:
break
yield chunk


def _collect_log_entries() -> LogFilesPayload:
"""Return serialized metadata for all log files."""
files: list[LogFileEntry] = []
total_size = 0

try:
ordered_entries = sorted(
_iter_log_files(), key=lambda entry: entry.stat().st_mtime, reverse=True
)
except OSError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Unable to read log directory",
) from exc

for file_path in ordered_entries:
try:
stat_result = file_path.stat()
except FileNotFoundError:
# File was removed between listing and stat; skip gracefully.
continue
except PermissionError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log file not accessible"
) from exc
except OSError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Unable to read log file metadata",
) from exc

files.append(_build_log_entry(file_path, stat_result))
total_size += stat_result.st_size

return cast(
LogFilesPayload,
{"files": files, "total_files": len(files), "total_size_bytes": total_size},
)


@router.get("/admin/logs/files")
async def list_log_files(_: None = Depends(require_admin_token)) -> LogFilesPayload:
"""Return metadata for available log files."""
return _collect_log_entries()


@router.get("/admin/logs/files/{filename}")
async def download_log_file(filename: str, _: None = Depends(require_admin_token)):
"""Stream the requested log file to the caller."""
path = _validated_log_path(filename)
try:
stat_result = path.stat()
except FileNotFoundError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log file not found"
) from exc
except PermissionError as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND, detail="Log file not accessible"
) from exc
except OSError as exc:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Unable to read log file"
) from exc

headers = {
"Content-Disposition": f'attachment; filename="{path.name}"',
"Content-Length": str(stat_result.st_size),
}
return StreamingResponse(
_stream_file(path),
media_type="text/plain; charset=utf-8",
headers=headers,
)


@router.get("/admin/logs/recent")
async def list_recent_logs(
days: int = 7,
limit: int = 100,
_: None = Depends(require_admin_token),
) -> LogFilesPayload:
"""Return log files modified within the requested timeframe."""
if days < RECENT_DAYS_MIN or days > RECENT_DAYS_MAX:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(f"Parameter 'days' must be between {RECENT_DAYS_MIN} and {RECENT_DAYS_MAX}"),
)
if limit < RECENT_LIMIT_MIN or limit > RECENT_LIMIT_MAX:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(f"Parameter 'limit' must be between {RECENT_LIMIT_MIN} and {RECENT_LIMIT_MAX}"),
)

cutoff = datetime.now(timezone.utc) - timedelta(days=days)
base_payload = _collect_log_entries()
filtered = [
entry
for entry in base_payload["files"]
if datetime.fromisoformat(entry["modified_at"].replace("Z", "+00:00")) >= cutoff
][:limit]

total_size = sum(entry["size_bytes"] for entry in filtered)
return cast(
LogFilesPayload,
{"files": filtered, "total_files": len(filtered), "total_size_bytes": total_size},
)


__all__ = ["router"]
2 changes: 1 addition & 1 deletion docs/bt-servant-log-api-spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ No new secrets, OAuth flows, or middleware are introduced. The same token gates

## 3. Endpoint Reference

The routes live in the admin router (`bt_servant_engine/apps/api/routes/admin.py`) to share middleware and auth. To avoid conflicts, we namespace them under `/admin/logs`.
The routes live alongside the existing admin endpoints in `bt_servant_engine/apps/api/routes/admin_logs.py` and are included by the API factory with the same admin-token dependency. To avoid conflicts, we namespace them under `/admin/logs`.

### 3.1 List Available Log Files

Expand Down
118 changes: 118 additions & 0 deletions tests/apps/api/routes/test_admin_log_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""Tests for the admin log retrieval endpoints."""

from __future__ import annotations

import os
from datetime import datetime, timedelta, timezone
from http import HTTPStatus
from typing import Tuple
from urllib.parse import quote

import pytest
from fastapi.testclient import TestClient

from bt_servant_engine.apps.api.app import create_app
from bt_servant_engine.apps.api.routes import admin_logs
from bt_servant_engine.bootstrap import build_default_service_container
from bt_servant_engine.core.config import config as app_config
import bt_servant_engine.core.logging as core_logging

ADMIN_TOKEN = "secret-token"
EXPECTED_FILE_COUNT = 2


def _auth_headers() -> dict[str, str]:
return {"Authorization": f"Bearer {ADMIN_TOKEN}"}


@pytest.fixture(name="logs_client")
def _logs_client(monkeypatch, tmp_path) -> Tuple[TestClient, os.PathLike[str]]:
"""Provision a TestClient with admin auth and a temporary logs directory."""

monkeypatch.setattr(app_config, "ENABLE_ADMIN_AUTH", True, raising=True)
monkeypatch.setattr(app_config, "ADMIN_API_TOKEN", ADMIN_TOKEN, raising=True)

monkeypatch.setattr(core_logging, "LOGS_DIR", tmp_path, raising=True)
monkeypatch.setattr(core_logging, "LOG_FILE_PATH", tmp_path / "bt_servant.log", raising=True)
monkeypatch.setattr(admin_logs, "LOGS_DIR", tmp_path, raising=True)

client = TestClient(create_app(build_default_service_container()))
return client, tmp_path


def test_list_logs_returns_sorted_entries(logs_client):
"""List endpoint returns sorted metadata and totals."""
client, logs_dir = logs_client

older_log = logs_dir / "older.log"
older_log.write_text("older")
older_ts = (datetime.now(timezone.utc) - timedelta(days=2)).timestamp()
os.utime(older_log, (older_ts, older_ts))

recent_log = logs_dir / "recent.log"
recent_log.write_text("recent")

resp = client.get("/admin/logs/files", headers=_auth_headers())
assert resp.status_code == HTTPStatus.OK
payload = resp.json()

assert payload["total_files"] == EXPECTED_FILE_COUNT
assert payload["total_size_bytes"] == sum(
path.stat().st_size for path in (older_log, recent_log)
)
names = [entry["name"] for entry in payload["files"]]
assert names == ["recent.log", "older.log"]
assert payload["files"][0]["modified_at"].endswith("Z")


def test_log_endpoints_require_authentication(logs_client):
"""Missing credentials should yield 401."""
client, logs_dir = logs_client
(logs_dir / "sample.log").write_text("data")

resp = client.get("/admin/logs/files")
assert resp.status_code == HTTPStatus.UNAUTHORIZED


def test_download_log_file_streams_content(logs_client):
"""Downloading a log streams raw text with headers."""
client, logs_dir = logs_client
log_path = logs_dir / "example.log"
log_path.write_text("line1\nline2\n")

resp = client.get("/admin/logs/files/example.log", headers=_auth_headers())
assert resp.status_code == HTTPStatus.OK
assert resp.text == "line1\nline2\n"
assert resp.headers["content-disposition"].endswith('example.log"')
assert resp.headers["content-length"] == str(log_path.stat().st_size)


def test_download_rejects_path_traversal(logs_client):
"""Guard against path traversal attempts."""
client, logs_dir = logs_client
(logs_dir / "valid.log").write_text("content")

encoded = quote("../secret.log", safe="")
resp = client.get(f"/admin/logs/files/{encoded}", headers=_auth_headers())
assert resp.status_code in {HTTPStatus.BAD_REQUEST, HTTPStatus.NOT_FOUND}


def test_recent_logs_filters_by_days_and_limit(logs_client):
"""Recent endpoint respects day window and limit."""
client, logs_dir = logs_client

old_log = logs_dir / "old.log"
old_log.write_text("old")
old_ts = (datetime.now(timezone.utc) - timedelta(days=10)).timestamp()
os.utime(old_log, (old_ts, old_ts))

new_log = logs_dir / "new.log"
new_log.write_text("new")

resp = client.get("/admin/logs/recent?days=7&limit=1", headers=_auth_headers())
assert resp.status_code == HTTPStatus.OK
payload = resp.json()

assert payload["total_files"] == 1
assert payload["files"][0]["name"] == "new.log"
assert payload["total_size_bytes"] == new_log.stat().st_size