Skip to content

Commit 9dc8a90

Browse files
rework interaction with jq (#567)
* rework interaction with jq --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent dd3f49b commit 9dc8a90

23 files changed

+197
-191
lines changed

.vscode/launch.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
"cwd": "${workspaceFolder}/backend",
1212
"envFile": "${workspaceFolder}/backend/.env",
1313
"env": {
14-
"BACKEND_TYPE": "api",
1514
"PYTHONPATH": "${workspaceFolder}/backend/src"
1615
}
1716
},
@@ -26,7 +25,6 @@
2625
"cwd": "${workspaceFolder}/backend",
2726
"envFile": "${workspaceFolder}/backend/.env",
2827
"env": {
29-
"BACKEND_TYPE": "worker",
3028
"PYTHONPATH": "${workspaceFolder}/backend/src"
3129
}
3230
},

backend/src/backend_api_entrypoint.sh

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
set -e
44
source .venv/bin/activate
55

6-
export BACKEND_TYPE="api"
7-
86
LOG_LEVEL=${LOG_LEVEL:-debug}
97
API_PORT=${API_PORT:-5500}
108
API_WORKERS=${API_WORKERS:-10}

backend/src/celery_background_jobs_worker_entrypoint.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ source .venv/bin/activate
55

66
export OMP_NUM_THREADS=1
77
export MKL_NUM_THREADS=1
8-
export BACKEND_TYPE="worker"
98

109
LOG_LEVEL=${LOG_LEVEL:-debug}
1110
CELERY_BACKGROUND_JOBS_WORKER_CONCURRENCY=${CELERY_BACKGROUND_JOBS_WORKER_CONCURRENCY:-1}

backend/src/modules/concept_over_time_analysis/cota_endpoint.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22
from common.dependencies import get_current_user, get_db_session
33
from core.auth.authz_user import AuthzUser
44
from fastapi import APIRouter, Depends
5-
from modules.concept_over_time_analysis.cota_crud import (
6-
crud_cota,
7-
)
5+
from modules.concept_over_time_analysis.cota_crud import crud_cota
86
from modules.concept_over_time_analysis.cota_dto import (
97
COTACreate,
108
COTACreateIntern,
@@ -234,5 +232,5 @@ async def get_cota_job(
234232
authz_user: AuthzUser = Depends(),
235233
) -> COTARefinementJobRead:
236234
job = js.get_job(job_id=cota_job_id)
237-
authz_user.assert_in_project(job.meta["project_id"])
235+
authz_user.assert_in_project(job.get_project_id())
238236
return COTARefinementJobRead.from_rq_job(job=job)
Lines changed: 15 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,13 @@
1-
from modules.concept_over_time_analysis.cota_crud import (
2-
crud_cota,
3-
)
4-
from modules.concept_over_time_analysis.cota_dto import (
5-
COTARead,
6-
COTARefinementJobInput,
7-
)
1+
from modules.concept_over_time_analysis.cota_crud import crud_cota
2+
from modules.concept_over_time_analysis.cota_dto import COTARead, COTARefinementJobInput
83
from modules.concept_over_time_analysis.refinement_steps.finetune_apply_compute import (
94
finetune_apply_compute,
105
)
116
from modules.concept_over_time_analysis.refinement_steps.init_search_space import (
127
init_search_space,
138
)
14-
from modules.concept_over_time_analysis.refinement_steps.store_in_db import (
15-
store_in_db,
16-
)
17-
from rq import get_current_job
18-
from systems.job_system.job_dto import EndpointGeneration, JobPriority
9+
from modules.concept_over_time_analysis.refinement_steps.store_in_db import store_in_db
10+
from systems.job_system.job_dto import EndpointGeneration, Job, JobPriority
1911
from systems.job_system.job_register_decorator import register_job
2012

2113

@@ -26,22 +18,18 @@
2618
priority=JobPriority.DEFAULT,
2719
generate_endpoints=EndpointGeneration.NONE,
2820
)
29-
def cota_refinement(
30-
payload: COTARefinementJobInput,
31-
) -> None:
21+
def cota_refinement(payload: COTARefinementJobInput, job: Job) -> None:
3222
from repos.db.sql_repo import SQLRepo
3323

34-
job = get_current_job()
35-
assert job is not None, "Job must be running in a worker context"
36-
3724
# init steps / current_step
38-
job.meta["steps"] = [
39-
"Initialize search space",
40-
"Finetune and apply compute",
41-
"Store in DB",
42-
]
43-
job.meta["current_step"] = 0
44-
job.save_meta()
25+
job.update(
26+
steps=[
27+
"Initialize search space",
28+
"Finetune and apply compute",
29+
"Store in DB",
30+
],
31+
current_step=0,
32+
)
4533

4634
with SQLRepo().db_session() as db:
4735
# make sure the cota exists!
@@ -55,10 +43,8 @@ def cota_refinement(
5543
# Do the refinement in 3 steps:
5644
search_space = init_search_space(db=db, cota=cota)
5745

58-
job.meta["current_step"] = 1
59-
job.save_meta()
46+
job.update(current_step=1)
6047
search_space = finetune_apply_compute(cota=cota, search_space=search_space)
6148

62-
job.meta["current_step"] = 2
63-
job.save_meta()
49+
job.update(current_step=1)
6450
store_in_db(db=db, cota_id=cota.id, search_space=search_space)

backend/src/modules/concept_over_time_analysis/cota_service.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@
77
from core.metadata.project_metadata_crud import crud_project_meta
88
from core.metadata.project_metadata_dto import ProjectMetadataRead
99
from fastapi.encoders import jsonable_encoder
10-
from modules.concept_over_time_analysis.cota_crud import (
11-
crud_cota,
12-
)
10+
from modules.concept_over_time_analysis.cota_crud import crud_cota
1311
from modules.concept_over_time_analysis.cota_dto import (
1412
COTACreateIntern,
1513
COTARead,
@@ -234,6 +232,6 @@ def start_refinement_job(
234232
db=db,
235233
id=payload.cota_id,
236234
update_dto=COTAUpdateIntern(
237-
last_refinement_job_id=job.id,
235+
last_refinement_job_id=job.get_id(),
238236
),
239237
)

backend/src/modules/duplicate_finder/duplicate_finder_jobs.py

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@
1010
)
1111
from modules.word_frequency.word_frequency_crud import crud_word_frequency
1212
from repos.db.sql_repo import SQLRepo
13-
from rq import get_current_job
1413
from scipy import sparse
1514
from sklearn.metrics.pairwise import manhattan_distances
16-
from systems.job_system.job_dto import EndpointGeneration, JobPriority
15+
from systems.job_system.job_dto import EndpointGeneration, Job, JobPriority
1716
from systems.job_system.job_register_decorator import register_job
1817

1918

@@ -26,12 +25,9 @@
2625
)
2726
def find_duplicates_job(
2827
payload: DuplicateFinderInput,
28+
job: Job,
2929
) -> DuplicateFinderOutput:
30-
job = get_current_job()
31-
assert job is not None, "Job must be running in a worker context"
32-
33-
job.meta["status_message"] = "Started duplicate finding"
34-
job.save_meta()
30+
job.update(status_message="Started duplicate finding")
3531

3632
logger.info("Finding duplicate text sdocs")
3733
t0 = time.time()
@@ -40,8 +36,7 @@ def find_duplicates_job(
4036
db, project_id=payload.project_id, doctype=DocType.text
4137
)
4238
t1 = time.time()
43-
job.meta["status_message"] = "Fetched word frequencies from database"
44-
job.save_meta()
39+
job.update(status_message="Fetched word frequencies from database")
4540
logger.info(f"query took: {t1 - t0}")
4641

4742
t0 = time.time()
@@ -75,8 +70,7 @@ def find_duplicates_job(
7570
(values, (index, indices)), shape=(len(idx2sdoc_id), vocab_size)
7671
)
7772
t1 = time.time()
78-
job.meta["status_message"] = "Created document word vectors"
79-
job.save_meta()
73+
job.update(status_message="Created document word vectors")
8074
logger.info(f"document vector creation took: {t1 - t0}")
8175
logger.info(f"vocab size: {vocab_size}")
8276
logger.info(f"document_vectors shape: {document_vectors.shape}")
@@ -85,8 +79,7 @@ def find_duplicates_job(
8579
t0 = time.time()
8680
word_dists = manhattan_distances(document_vectors, document_vectors)
8781
t1 = time.time()
88-
job.meta["status_message"] = "Computed distances between documents"
89-
job.save_meta()
82+
job.update(status_message="Computed distances between documents")
9083
logger.info(f"manhatten distance took: {t1 - t0}")
9184

9285
# mask out self distances and one half of the matrix
@@ -103,8 +96,7 @@ def find_duplicates_job(
10396
)
10497
).tolist()
10598
t1 = time.time()
106-
job.meta["status_message"] = "Identified duplicate pairs"
107-
job.save_meta()
99+
job.update(status_message="Identified duplicate pairs")
108100
logger.info(f"finding duplicates took: {t1 - t0}")
109101

110102
# map back to sdoc_ids
@@ -120,8 +112,7 @@ def find_duplicates_job(
120112
G.to_undirected()
121113
subgraph_nodes = [list(subgraph) for subgraph in nx.connected_components(G)]
122114
t1 = time.time()
123-
job.meta["status_message"] = "Finished finding duplicates!"
124-
job.save_meta()
115+
job.update(status_message="Finished finding duplicates!")
125116
logger.info(f"graph grouping took: {t1 - t0}")
126117

127118
return DuplicateFinderOutput(duplicates=subgraph_nodes)

backend/src/modules/eximport/export_jobs.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from modules.eximport.export_job_dto import ExportJobInput, ExportJobOutput
2-
from systems.job_system.job_dto import EndpointGeneration, JobPriority
2+
from systems.job_system.job_dto import EndpointGeneration, Job, JobPriority
33
from systems.job_system.job_register_decorator import register_job
44

55

@@ -10,9 +10,7 @@
1010
priority=JobPriority.DEFAULT,
1111
generate_endpoints=EndpointGeneration.MINIMAL,
1212
)
13-
def export_data(
14-
payload: ExportJobInput,
15-
) -> ExportJobOutput:
13+
def export_data(payload: ExportJobInput, job: Job) -> ExportJobOutput:
1614
from modules.eximport.export_service import ExportService
1715

1816
return ExportService().handle_export_job(

backend/src/modules/eximport/import_endpoint.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,7 @@
33
from common.dependencies import get_current_user
44
from core.auth.authz_user import AuthzUser
55
from fastapi import APIRouter, Depends, UploadFile
6-
from modules.eximport.import_job_dto import (
7-
ImportJobInput,
8-
ImportJobRead,
9-
ImportJobType,
10-
)
6+
from modules.eximport.import_job_dto import ImportJobInput, ImportJobRead, ImportJobType
117
from modules.eximport.import_service import ImportJobPreparationError, ImportService
128
from repos.filesystem_repo import FilesystemRepo
139
from systems.job_system.job_service import JobService
@@ -137,7 +133,7 @@ def get_import_job(
137133
*, import_job_id: str, authz_user: AuthzUser = Depends()
138134
) -> ImportJobRead:
139135
job = js.get_job(import_job_id)
140-
authz_user.assert_in_project(job.meta["project_id"])
136+
authz_user.assert_in_project(job.get_project_id())
141137
return ImportJobRead.from_rq_job(job)
142138

143139

@@ -152,5 +148,5 @@ def get_all_import_jobs(
152148
authz_user.assert_in_project(project_id)
153149

154150
jobs = js.get_jobs_by_project(job_type="import", project_id=project_id)
155-
jobs.sort(key=lambda x: x.meta["created"], reverse=True)
151+
jobs.sort(key=lambda x: x.get_created(), reverse=True)
156152
return [ImportJobRead.from_rq_job(job) for job in jobs]

backend/src/modules/eximport/import_jobs.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from modules.eximport.import_job_dto import ImportJobInput
2-
from systems.job_system.job_dto import EndpointGeneration, JobPriority
2+
from systems.job_system.job_dto import EndpointGeneration, Job, JobPriority
33
from systems.job_system.job_register_decorator import register_job
44

55

@@ -10,9 +10,7 @@
1010
priority=JobPriority.DEFAULT,
1111
generate_endpoints=EndpointGeneration.NONE,
1212
)
13-
def import_data(
14-
payload: ImportJobInput,
15-
) -> None:
13+
def import_data(payload: ImportJobInput, job: Job) -> None:
1614
from modules.eximport.import_service import ImportService
1715

1816
return ImportService().handle_import_job(

0 commit comments

Comments
 (0)