Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 0 additions & 89 deletions backend/src/core/celery/background_jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,36 +4,9 @@
from celery import Task, group
from celery.result import GroupResult
from modules.crawler.crawler_job_dto import CrawlerJobParameters, CrawlerJobRead
from modules.eximport.export_job_dto import ExportJobParameters, ExportJobRead
from modules.eximport.import_job_dto import ImportJobParameters, ImportJobRead
from modules.ml.ml_job_dto import MLJobParameters, MLJobRead
from modules.perspectives.perspectives_job import (
PerspectivesJobParams,
PerspectivesJobRead,
)
from preprocessing.pipeline.model.pipeline_cargo import PipelineCargo


def start_cota_refinement_job_async(
cota_job_id: str,
) -> None:
from core.celery.background_jobs.tasks import start_cota_refinement_job_task

assert isinstance(start_cota_refinement_job_task, Task), "Not a Celery Task"

start_cota_refinement_job_task.apply_async(kwargs={"cota_job_id": cota_job_id})


def start_trainer_job_async(
trainer_job_id: str,
) -> None:
from core.celery.background_jobs.tasks import start_trainer_job_task

assert isinstance(start_trainer_job_task, Task), "Not a Celery Task"

start_trainer_job_task.apply_async(kwargs={"trainer_job_id": trainer_job_id})


def import_uploaded_archive_apply_async(
archive_file_path: Path, project_id: int
) -> Any:
Expand All @@ -46,34 +19,6 @@ def import_uploaded_archive_apply_async(
)


def prepare_and_start_export_job_async(
export_params: ExportJobParameters,
) -> ExportJobRead:
from core.celery.background_jobs.tasks import start_export_job
from modules.eximport.export_service import ExportService

assert isinstance(start_export_job, Task), "Not a Celery Task"

exs: ExportService = ExportService()
ex_job = exs.prepare_export_job(export_params)
print("-----ex id", ex_job.id)
start_export_job.apply_async(kwargs={"export_job": ex_job})
return ex_job


def prepare_and_start_import_job_async(
import_job_params: ImportJobParameters,
) -> ImportJobRead:
from core.celery.background_jobs.tasks import start_import_job
from modules.eximport.import_service import ImportService

assert isinstance(start_import_job, Task), "Not a Celery Task"
ims: ImportService = ImportService()
ims_job = ims.prepare_import_job(import_job_params)
start_import_job.apply_async(kwargs={"import_job": ims_job})
return ims_job


def prepare_and_start_crawling_job_async(
crawler_params: CrawlerJobParameters,
) -> CrawlerJobRead:
Expand Down Expand Up @@ -102,40 +47,6 @@ def prepare_and_start_crawling_job_async(
return cj


def prepare_and_start_ml_job_async(
ml_job_params: MLJobParameters,
) -> MLJobRead:
from core.celery.background_jobs.tasks import start_ml_job
from modules.ml.ml_service import MLService

assert isinstance(start_ml_job, Task), "Not a Celery Task"

mls: MLService = MLService()
ml_job = mls.prepare_ml_job(ml_job_params)
start_ml_job.apply_async(kwargs={"ml_job": ml_job})
return ml_job


def prepare_and_start_perspectives_job_async(
project_id: int,
aspect_id: int,
perspectives_job_params: PerspectivesJobParams,
) -> PerspectivesJobRead:
from core.celery.background_jobs.tasks import start_perspectives_job
from modules.perspectives.perspectives_job_service import PerspectivesJobService

assert isinstance(start_perspectives_job, Task), "Not a Celery Task"

pjs: PerspectivesJobService = PerspectivesJobService()
perspectives_job = pjs.prepare_perspectives_job(
project_id=project_id,
aspect_id=aspect_id,
perspectives_params=perspectives_job_params,
)
start_perspectives_job.apply_async(kwargs={"perspectives_job": perspectives_job})
return perspectives_job


def execute_text_preprocessing_pipeline_apply_async(
cargos: list[PipelineCargo],
) -> GroupResult:
Expand Down
10 changes: 0 additions & 10 deletions backend/src/core/celery/background_jobs/cota.py

This file was deleted.

19 changes: 0 additions & 19 deletions backend/src/core/celery/background_jobs/export.py

This file was deleted.

17 changes: 0 additions & 17 deletions backend/src/core/celery/background_jobs/import_.py

This file was deleted.

12 changes: 0 additions & 12 deletions backend/src/core/celery/background_jobs/ml.py

This file was deleted.

12 changes: 0 additions & 12 deletions backend/src/core/celery/background_jobs/perspectives.py

This file was deleted.

48 changes: 0 additions & 48 deletions backend/src/core/celery/background_jobs/tasks.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,24 @@
from pathlib import Path

from core.celery.background_jobs.cota import start_cota_refinement_job_
from core.celery.background_jobs.crawl import start_crawler_job_
from core.celery.background_jobs.export import start_export_job_
from core.celery.background_jobs.import_ import start_import_job_
from core.celery.background_jobs.ml import start_ml_job_
from core.celery.background_jobs.perspectives import start_perspectives_job_
from core.celery.background_jobs.preprocess import (
execute_audio_preprocessing_pipeline_,
execute_image_preprocessing_pipeline_,
execute_text_preprocessing_pipeline_,
execute_video_preprocessing_pipeline_,
import_uploaded_archive_,
)
from core.celery.background_jobs.trainer import start_trainer_job_
from core.celery.celery_worker import celery_worker
from modules.crawler.crawler_job_dto import CrawlerJobRead
from modules.eximport.export_job_dto import ExportJobRead
from modules.eximport.import_job_dto import ImportJobRead
from modules.ml.ml_job_dto import MLJobRead
from modules.perspectives.perspectives_job import PerspectivesJobRead
from preprocessing.pipeline.model.pipeline_cargo import PipelineCargo


@celery_worker.task(
acks_late=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 0, "countdown": 5},
)
def start_cota_refinement_job_task(cota_job_id: str) -> None:
start_cota_refinement_job_(cota_job_id=cota_job_id)


@celery_worker.task(
acks_late=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 0, "countdown": 5},
)
def start_trainer_job_task(trainer_job_id: str) -> None:
start_trainer_job_(trainer_job_id=trainer_job_id)


@celery_worker.task(acks_late=True)
def start_export_job(export_job: ExportJobRead) -> None:
start_export_job_(export_job=export_job)


@celery_worker.task(acks_late=True)
def start_import_job(import_job: ImportJobRead) -> None:
start_import_job_(import_job=import_job)


@celery_worker.task(acks_late=True)
def start_crawler_job(crawler_job: CrawlerJobRead) -> tuple[Path, int]:
archive_file_path, project_id = start_crawler_job_(crawler_job=crawler_job)
return archive_file_path, project_id


@celery_worker.task(acks_late=True)
def start_ml_job(ml_job: MLJobRead) -> None:
start_ml_job_(ml_job=ml_job)


@celery_worker.task(acks_late=True)
def start_perspectives_job(perspectives_job: PerspectivesJobRead) -> None:
start_perspectives_job_(perspectives_job=perspectives_job)


@celery_worker.task(
acks_late=True,
autoretry_for=(Exception,),
Expand Down
10 changes: 0 additions & 10 deletions backend/src/core/celery/background_jobs/trainer.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add last_refinement_job_id to cota

Revision ID: 1b21abe44adb
Revises: 6f484bc7d629
Create Date: 2025-08-05 15:14:25.001940

"""

from typing import Sequence

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision: str = "1b21abe44adb"
down_revision: str | None = "6f484bc7d629"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"conceptovertimeanalysis",
sa.Column("last_refinement_job_id", sa.String(), nullable=True),
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("conceptovertimeanalysis", "last_refinement_job_id")
# ### end Alembic commands ###
61 changes: 16 additions & 45 deletions backend/src/modules/concept_over_time_analysis/cota_dto.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import uuid
from datetime import datetime
from enum import Enum

Expand All @@ -7,10 +6,7 @@
from pydantic import BaseModel, ConfigDict, Field
from pydantic.functional_validators import field_validator
from repos.db.dto_base import UpdateDTOBase
from systems.job_system.background_job_base_dto import (
BackgroundJobBase,
BackgroundJobBaseUpdate,
)
from systems.job_system.job_dto import JobInputBase, JobRead

####################
# COTA Base Types
Expand Down Expand Up @@ -146,6 +142,10 @@ class COTAUpdate(BaseModel, UpdateDTOBase):


class COTAUpdateIntern(BaseModel, UpdateDTOBase):
last_refinement_job_id: str | None = Field(
description="ID of the last refinement job for the ConceptOverTimeAnalysis",
default=None,
)
name: str | None = Field(
description="Name of the ConceptOverTimeAnalysis",
default=None,
Expand Down Expand Up @@ -179,6 +179,9 @@ class COTARead(ConceptOverTimeAnalysisBaseDTO):
project_id: int = Field(
description="Project the ConceptOverTimeAnalysis belongs to"
)
last_refinement_job_id: str | None = Field(
description="ID of the last refinement job for the ConceptOverTimeAnalysis"
)
timeline_settings: COTATimelineSettings = Field(
description="Timeline Analysis Settings of the ConceptOverTimeAnalysis."
)
Expand Down Expand Up @@ -307,47 +310,15 @@ class COTARefinementHyperparameters(BaseModel):
)


class COTARefinementJobBase(BackgroundJobBase):
pass


class COTARefinementJobCreate(COTARefinementJobBase):
cota: COTARead = Field(description="COTA that is used in the COTARefinementJob")

hyperparams: COTARefinementHyperparameters = Field(
description="Hyperparameters of the COTARefinementJob"
)


class COTARefinementJobUpdate(BackgroundJobBaseUpdate):
current_pipeline_step: str | None = Field(
description="Current Pipeline Step of the COTARefinementJob",
default=None,
)

error_message: str | None = Field(
description="Optional ErrorMessage of the COTARefinementJob",
default=None,
)


class COTARefinementJobRead(COTARefinementJobCreate):
id: str = Field(
description="ID of the COTARefinementJob",
default_factory=lambda: str(uuid.uuid4()),
class COTARefinementJobInput(JobInputBase):
cota_id: int = Field(
description="ID of the COTA that is used in the COTARefinementJob"
)

current_pipeline_step: str | None = Field(
description="Current Pipeline Step of the COTARefinementJob",
default=None,
)

error_message: str | None = Field(
description="Optional ErrorMessage of the COTARefinementJob",
default=None,
hyperparams: COTARefinementHyperparameters = Field(
description="Hyperparameters of the COTARefinementJob",
default=COTARefinementHyperparameters(),
)

created: datetime = Field(description="Created timestamp of the COTARefinementJob")
updated: datetime = Field(description="Updated timestamp of the COTARefinementJob")

model_config = ConfigDict(from_attributes=True)
class COTARefinementJobRead(JobRead[COTARefinementJobInput, None]):
pass
Loading