Skip to content

Commit

Permalink
feat(api): support pagination on list_apps route (#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
hussein-awala authored Jul 8, 2024
1 parent c317033 commit 5206c4e
Showing 1 changed file with 49 additions and 15 deletions.
64 changes: 49 additions & 15 deletions spark_on_k8s/api/apps.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from fastapi import APIRouter

# ruff: noqa: TCH002
from fastapi.responses import Response
from kubernetes_asyncio.client import CoreV1Api
from pydantic import BaseModel

Expand All @@ -24,25 +27,56 @@ class SparkApp(BaseModel):
spark_history_proxy: bool = False


async def _list_apps(namespace: str, max_per_page: int, continue_from: str) -> tuple[str, list[SparkApp]]:
no_more_items = "no-more-items"
if continue_from == no_more_items:
return no_more_items, []
core_client = CoreV1Api(await KubernetesClientSingleton.client())
search_params = {
"namespace": namespace,
"label_selector": "spark-role=driver",
"limit": max_per_page,
}
if continue_from:
search_params["_continue"] = continue_from
driver_pods = await core_client.list_namespaced_pod(**search_params)
return (
driver_pods.metadata._continue if driver_pods.metadata._continue else no_more_items,
[
SparkApp(
app_id=pod.metadata.labels.get("spark-app-id", pod.metadata.name),
status=get_app_status(pod),
driver_logs=True,
spark_ui_proxy=pod.metadata.labels.get("spark-ui-proxy", False),
)
for pod in driver_pods.items
],
)


@router.get("/list_apps")
async def list_apps_default_namespace() -> list[SparkApp]:
async def list_apps_default_namespace(
response: Response, max_per_page: int = 10, continue_from: str = ""
) -> list[SparkApp]:
"""List spark apps in the default namespace."""
return await list_apps(namespace=APIConfiguration.SPARK_ON_K8S_API_DEFAULT_NAMESPACE)
continue_from, apps = await _list_apps(
namespace=APIConfiguration.SPARK_ON_K8S_API_DEFAULT_NAMESPACE,
max_per_page=max_per_page,
continue_from=continue_from,
)
response.headers["X-Continue"] = continue_from
return apps


@router.get("/list_apps/{namespace}")
async def list_apps(namespace: str) -> list[SparkApp]:
async def list_apps(
response: Response, namespace: str, max_per_page: int = 10, continue_from: str = ""
) -> list[SparkApp]:
"""List spark apps in a namespace."""
core_client = CoreV1Api(await KubernetesClientSingleton.client())
driver_pods = await core_client.list_namespaced_pod(
namespace=namespace, label_selector="spark-role=driver"
continue_from, apps = await _list_apps(
namespace=namespace,
max_per_page=max_per_page,
continue_from=continue_from,
)
return [
SparkApp(
app_id=pod.metadata.labels.get("spark-app-id", pod.metadata.name),
status=get_app_status(pod),
driver_logs=True,
spark_ui_proxy=pod.metadata.labels.get("spark-ui-proxy", False),
)
for pod in driver_pods.items
]
response.headers["X-Continue"] = continue_from
return apps

0 comments on commit 5206c4e

Please sign in to comment.