Skip to content

Commit 591d442

Browse files
authored
[PT-4557] Query jobs (#630)
* Draft implementation * Job querying without sorting * Add sorting support * Clean up
1 parent aba4c27 commit 591d442

File tree

6 files changed

+164
-14
lines changed

6 files changed

+164
-14
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,12 @@ You can check your current version with the following command:
2929
```
3030

3131
For more information, see [UP42 Python package description](https://pypi.org/project/up42-py/).
32+
## 1.0.4a16
33+
34+
**Jun 11, 2024**
35+
36+
- Added job querying to `processing.py`
37+
3238
## 1.0.4a15
3339

3440
**Jun 11, 2024**

pyproject.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "up42-py"
3-
version = "1.0.4a15"
3+
version = "1.0.4a16"
44
description = "Python SDK for UP42, the geospatial marketplace and developer platform."
55
authors = ["UP42 GmbH <[email protected]>"]
66
license = "https://github.com/up42/up42-py/blob/master/LICENSE"

tests/test_processing.py

+67-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import dataclasses
22
import datetime
33
import random
4+
import urllib.parse
45
import uuid
6+
from typing import Any, List, Optional
57
from unittest import mock
68

79
import pystac
@@ -11,7 +13,7 @@
1113

1214
from tests import helpers
1315
from tests.fixtures import fixtures_globals as constants
14-
from up42 import processing
16+
from up42 import processing, utils
1517

1618
PROCESS_ID = "process-id"
1719
VALIDATION_URL = f"{constants.API_HOST}/v2/processing/processes/{PROCESS_ID}/validation"
@@ -34,7 +36,8 @@
3436
)
3537

3638
JOB_ID = str(uuid.uuid4())
37-
GET_JOB_URL = f"{constants.API_HOST}/v2/processing/jobs/{JOB_ID}"
39+
JOBS_URL = f"{constants.API_HOST}/v2/processing/jobs"
40+
JOB_URL = f"{JOBS_URL}/{JOB_ID}"
3841
CREDITS = 1
3942
ACCOUNT_ID = str(uuid.uuid4())
4043
DEFINITION = {
@@ -280,5 +283,66 @@ def test_should_provide_inputs(self, requests_mock: req_mock.Mocker):
280283

281284
class TestJob:
282285
def test_should_get_job(self, requests_mock: req_mock.Mocker):
283-
requests_mock.get(url=GET_JOB_URL, json=JOB_METADATA)
286+
requests_mock.get(url=JOB_URL, json=JOB_METADATA)
284287
assert processing.Job.get(JOB_ID) == JOB
288+
289+
@pytest.mark.parametrize("process_id", [None, [PROCESS_ID]])
290+
@pytest.mark.parametrize("workspace_id", [None, constants.WORKSPACE_ID])
291+
@pytest.mark.parametrize("status", [None, [processing.JobStatus.CAPTURED]])
292+
@pytest.mark.parametrize("min_duration", [None, 1])
293+
@pytest.mark.parametrize("max_duration", [None, 10])
294+
@pytest.mark.parametrize("sort_by", [None, processing.JobSorting.process_id.desc])
295+
def test_should_get_all_jobs(
296+
self,
297+
requests_mock: req_mock.Mocker,
298+
process_id: Optional[List[str]],
299+
workspace_id: Optional[str],
300+
status: Optional[List[processing.JobStatus]],
301+
min_duration: Optional[int],
302+
max_duration: Optional[int],
303+
sort_by: Optional[utils.SortingField],
304+
):
305+
query_params: dict[str, Any] = {}
306+
if process_id:
307+
query_params["processID"] = process_id
308+
if workspace_id:
309+
query_params["workspaceID"] = workspace_id
310+
if status:
311+
query_params["status"] = [entry.value for entry in status]
312+
if min_duration:
313+
query_params["minDuration"] = min_duration
314+
if max_duration:
315+
query_params["maxDuration"] = max_duration
316+
if sort_by:
317+
query_params["sort"] = str(sort_by)
318+
319+
query = urllib.parse.urlencode(query_params)
320+
321+
next_page_url = f"{JOBS_URL}/next"
322+
requests_mock.get(
323+
url=JOBS_URL + (query and f"?{query}"),
324+
json={
325+
"jobs": [JOB_METADATA] * 3,
326+
"links": [{"rel": "next", "href": next_page_url}],
327+
},
328+
)
329+
requests_mock.get(
330+
url=next_page_url,
331+
json={
332+
"jobs": [JOB_METADATA] * 2,
333+
"links": [],
334+
},
335+
)
336+
assert (
337+
list(
338+
processing.Job.all(
339+
process_id=process_id,
340+
workspace_id=workspace_id,
341+
status=status,
342+
min_duration=min_duration,
343+
max_duration=max_duration,
344+
sort_by=sort_by,
345+
)
346+
)
347+
== [JOB] * 5
348+
)

tests/test_utils.py

+13
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,16 @@ def test_read_json_fails_if_path_not_found(str_or_path):
275275
assert utils.read_json(path_or_dict=str_or_path)
276276

277277
assert str(str_or_path) in str(ex.value)
278+
279+
280+
class TestSortingField:
281+
@pytest.mark.parametrize("ascending", [True, False])
282+
def test_should_provide_directions(self, ascending: bool):
283+
field = utils.SortingField(name="name", ascending=ascending)
284+
assert field.asc == utils.SortingField(name="name", ascending=True)
285+
assert field.desc == utils.SortingField(name="name", ascending=False)
286+
287+
def test_should_stringify(self):
288+
field = utils.SortingField(name="name")
289+
assert str(field.asc) == "name,asc"
290+
assert str(field.desc) == "name,desc"

up42/processing.py

+58-10
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
import dataclasses
33
import datetime
44
import enum
5-
from typing import ClassVar, List, Optional, TypedDict, Union
5+
from typing import ClassVar, Iterator, List, Optional, TypedDict, Union
66

77
import pystac
88
import requests
99

10-
from up42 import base, host
10+
from up42 import base, host, utils
1111

1212

1313
@dataclasses.dataclass(frozen=True)
@@ -42,6 +42,17 @@ class JobMetadata(TypedDict):
4242
updated: str
4343

4444

45+
class JobSorting:
46+
process_id = utils.SortingField("processID")
47+
status = utils.SortingField("status")
48+
created = utils.SortingField("created")
49+
credits = utils.SortingField("creditConsumption.credits")
50+
51+
52+
def _to_datetime(value: Optional[str]):
53+
return value and datetime.datetime.fromisoformat(value.rstrip("Z"))
54+
55+
4556
@dataclasses.dataclass
4657
class Job:
4758
session = base.Session()
@@ -56,10 +67,6 @@ class Job:
5667
started: Optional[datetime.datetime] = None
5768
finished: Optional[datetime.datetime] = None
5869

59-
@staticmethod
60-
def __to_datetime(value: Optional[str]):
61-
return value and datetime.datetime.fromisoformat(value.rstrip("Z"))
62-
6370
@staticmethod
6471
def from_metadata(metadata: JobMetadata) -> "Job":
6572
return Job(
@@ -69,10 +76,10 @@ def from_metadata(metadata: JobMetadata) -> "Job":
6976
workspace_id=metadata["workspaceID"],
7077
definition=metadata["definition"],
7178
status=JobStatus(metadata["status"]),
72-
created=Job.__to_datetime(metadata["created"]),
73-
started=Job.__to_datetime(metadata["started"]),
74-
finished=Job.__to_datetime(metadata["finished"]),
75-
updated=Job.__to_datetime(metadata["updated"]),
79+
created=_to_datetime(metadata["created"]),
80+
started=_to_datetime(metadata["started"]),
81+
finished=_to_datetime(metadata["finished"]),
82+
updated=_to_datetime(metadata["updated"]),
7683
)
7784

7885
@classmethod
@@ -81,6 +88,47 @@ def get(cls, job_id: str) -> "Job":
8188
metadata = cls.session.get(url).json()
8289
return cls.from_metadata(metadata)
8390

91+
@classmethod
92+
def all(
93+
cls,
94+
process_id: Optional[List[str]] = None,
95+
workspace_id: Optional[str] = None,
96+
status: Optional[List[JobStatus]] = None,
97+
min_duration: Optional[int] = None,
98+
max_duration: Optional[int] = None,
99+
sort_by: Optional[utils.SortingField] = None,
100+
*,
101+
# used only for performance tuning and testing only
102+
page_size: Optional[int] = None,
103+
) -> Iterator["Job"]:
104+
query_params = {
105+
key: str(value)
106+
for key, value in {
107+
"workspaceID": workspace_id,
108+
"processID": process_id,
109+
"status": [entry.value for entry in status] if status else None,
110+
"minDuration": min_duration,
111+
"maxDuration": max_duration,
112+
"limit": page_size,
113+
"sort": sort_by,
114+
}.items()
115+
if value
116+
}
117+
118+
def get_pages():
119+
page = cls.session.get(host.endpoint("/v2/processing/jobs"), params=query_params).json()
120+
while page:
121+
yield page["jobs"]
122+
next_page_url = next(
123+
(link["href"] for link in page["links"] if link["rel"] == "next"),
124+
None,
125+
)
126+
page = next_page_url and cls.session.get(next_page_url).json()
127+
128+
for page in get_pages():
129+
for metadata in page:
130+
yield Job.from_metadata(metadata)
131+
84132

85133
CostType = Union[int, float, "Cost"]
86134

up42/utils.py

+19
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import copy
2+
import dataclasses
23
import datetime
34
import functools
45
import importlib.metadata
@@ -424,3 +425,21 @@ def stac_client(auth: requests.auth.AuthBase):
424425
url=host.endpoint("/v2/assets/stac/"),
425426
request_modifier=request_modifier,
426427
)
428+
429+
430+
@dataclasses.dataclass(frozen=True)
431+
class SortingField:
432+
name: str
433+
ascending: bool = True
434+
435+
@property
436+
def asc(self):
437+
return SortingField(name=self.name)
438+
439+
@property
440+
def desc(self):
441+
return SortingField(name=self.name, ascending=False)
442+
443+
def __str__(self):
444+
order = "asc" if self.ascending else "desc"
445+
return f"{self.name},{order}"

0 commit comments

Comments
 (0)