Skip to content

Commit 64fc497

Browse files
authored
Merge pull request #435 from uhh-lt/import
Import for text, audio, video pipelines
2 parents eec177d + ddb28ef commit 64fc497

File tree

160 files changed

+4891
-2229
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

160 files changed

+4891
-2229
lines changed

backend/.env.example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ JWT_SECRET=
1616

1717
# Where to store uploaded files.
1818
# <path_to_dats_repo>/docker/backend_repo
19-
REPO_ROOT=/insert_path_to_dats_repo/docker/backend_repo
19+
SHARED_REPO_ROOT=/insert_path_to_dats_repo/docker/backend_repo
2020

2121
# The system user is automatically created and owns automatically generated data.
2222
SYSTEM_USER_EMAIL="[email protected]"
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""Add token time starts and ends to sdoc data
2+
Revision ID: 4eb64db5a67a
3+
Revises: 050f9378a3e1
4+
Create Date: 2025-01-09 17:00:29.037251
5+
6+
"""
7+
8+
from typing import Sequence, Union
9+
10+
import sqlalchemy as sa
11+
from sqlalchemy.dialects import postgresql
12+
13+
from alembic import op
14+
15+
# revision identifiers, used by Alembic.
16+
revision: str = "4eb64db5a67a"
17+
down_revision: Union[str, None] = "050f9378a3e1"
18+
branch_labels: Union[str, Sequence[str], None] = None
19+
depends_on: Union[str, Sequence[str], None] = None
20+
21+
22+
def upgrade() -> None:
23+
# ### commands auto generated by Alembic - please adjust! ###
24+
25+
# TODO: if anybody on production uses audio or video files
26+
# - read metadata word-level-transcriptions -> write to correpsonding new sdoc_data field
27+
# - read .transcript.txt files -> transfer to sdoc_data content & html of corresponding audio / video data
28+
# - if the transcript was annotated -> move all annotations to corresponding audio / video file annotations
29+
30+
op.add_column(
31+
"sourcedocumentdata",
32+
sa.Column("token_time_starts", postgresql.ARRAY(sa.Integer()), nullable=True),
33+
)
34+
op.add_column(
35+
"sourcedocumentdata",
36+
sa.Column("token_time_ends", postgresql.ARRAY(sa.Integer()), nullable=True),
37+
)
38+
# ### end Alembic commands ###
39+
40+
41+
def downgrade() -> None:
42+
# ### commands auto generated by Alembic - please adjust! ###
43+
op.drop_column("sourcedocumentdata", "token_time_ends")
44+
op.drop_column("sourcedocumentdata", "token_time_starts")
45+
# ### end Alembic commands ###
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
"""add repo_url to sdocdata
2+
3+
Revision ID: 970c55224a39
4+
Revises: 4eb64db5a67a
5+
Create Date: 2025-01-13 10:21:43.457535
6+
7+
"""
8+
9+
from typing import Sequence, Union
10+
11+
import sqlalchemy as sa
12+
from sqlalchemy.orm import sessionmaker
13+
14+
from alembic import op
15+
from app.core.data.crud.source_document import crud_sdoc
16+
from app.core.data.doc_type import DocType
17+
from app.core.data.dto.source_document import SourceDocumentRead
18+
from app.core.data.repo.repo_service import RepoService
19+
20+
# revision identifiers, used by Alembic.
21+
revision: str = "970c55224a39"
22+
down_revision: Union[str, None] = "4eb64db5a67a"
23+
branch_labels: Union[str, Sequence[str], None] = None
24+
depends_on: Union[str, Sequence[str], None] = None
25+
26+
27+
def upgrade() -> None:
28+
# add the column, non-nullable
29+
op.add_column(
30+
"sourcedocumentdata", sa.Column("repo_url", sa.String(), nullable=True)
31+
)
32+
33+
conn = op.get_bind()
34+
35+
# 1. Read all existing project ids
36+
projects = conn.execute(sa.text("SELECT id FROM project")).fetchall()
37+
print("Found projects:", len(projects))
38+
39+
# 2. Read all existing Source Documents
40+
db = sessionmaker(bind=conn)()
41+
for row in projects:
42+
proj_id = row.id
43+
print("Processing project:", proj_id)
44+
45+
sdocs = crud_sdoc.read_by_project(db=db, proj_id=proj_id, only_finished=False) # type: ignore
46+
47+
# 3. Use the repo service to get the URL of the Source Document
48+
urls = []
49+
for sdoc in sdocs:
50+
url = RepoService().get_sdoc_url(
51+
sdoc=SourceDocumentRead.model_validate(sdoc),
52+
relative=True,
53+
webp=sdoc.doctype == DocType.image,
54+
thumbnail=False,
55+
)
56+
urls.append(url)
57+
58+
# 4. Update the repo_url field in the Source Document Data table
59+
for sdoc, url in zip(sdocs, urls):
60+
op.execute(
61+
f"UPDATE sourcedocumentdata SET repo_url = '{url}' WHERE id = {sdoc.id}"
62+
)
63+
64+
# change the column to non-nullable
65+
op.alter_column("sourcedocumentdata", "repo_url", nullable=False)
66+
67+
68+
def downgrade() -> None:
69+
# ### commands auto generated by Alembic - please adjust! ###
70+
op.drop_column("sourcedocumentdata", "repo_url")
71+
# ### end Alembic commands ###
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
import uuid
2+
3+
from fastapi import APIRouter, Depends, File, HTTPException, UploadFile
4+
from sqlalchemy.orm import Session
5+
6+
from api.dependencies import get_current_user, get_db_session
7+
from app.celery.background_jobs import prepare_and_start_import_job_async
8+
from app.core.authorization.authz_user import AuthzUser
9+
from app.core.data.crud.project import crud_project
10+
from app.core.data.dto.import_job import (
11+
ImportJobParameters,
12+
ImportJobRead,
13+
ImportJobType,
14+
)
15+
from app.core.data.dto.project import ProjectCreate
16+
from app.core.data.dto.user import UserRead
17+
from app.core.data.import_.import_service import ImportService
18+
from app.core.data.repo.repo_service import RepoService
19+
20+
router = APIRouter(
21+
prefix="/import", dependencies=[Depends(get_current_user)], tags=["import"]
22+
)
23+
24+
ims: ImportService = ImportService()
25+
repo: RepoService = RepoService()
26+
27+
28+
@router.post(
29+
"/{proj_id}/codes",
30+
response_model=ImportJobRead,
31+
summary="Starts the import codes job on given project id.",
32+
)
33+
def start_import_codes_job(
34+
*,
35+
# Ahmad: Since we're uploading a file we have to use multipart/form-data directly in the router method (see project put)
36+
proj_id: int,
37+
uploaded_file: UploadFile = File(
38+
...,
39+
description=("CSV file of codes that gets uploaded into project"),
40+
),
41+
authz_user: AuthzUser = Depends(),
42+
) -> ImportJobRead:
43+
authz_user.assert_in_project(proj_id)
44+
if not uploaded_file:
45+
raise HTTPException(
46+
status_code=418,
47+
detail="Missing codes file.",
48+
)
49+
if not __is_file_csv(uploaded_file=uploaded_file):
50+
raise HTTPException(
51+
status_code=415,
52+
detail="Codes need to be in csv format.",
53+
)
54+
user_id = authz_user.user.id
55+
filename = f"import_user_code_{user_id}_{proj_id}.csv"
56+
filepath = repo.get_dst_path_for_temp_file(filename)
57+
filepath = repo.store_uploaded_file(
58+
uploaded_file=uploaded_file, filepath=filepath, fn=filename
59+
)
60+
61+
import_job_params = ImportJobParameters(
62+
proj_id=proj_id,
63+
filename=filename,
64+
user_id=user_id,
65+
import_job_type=ImportJobType.CODES,
66+
)
67+
return prepare_and_start_import_job_async(import_job_params=import_job_params)
68+
69+
70+
@router.post(
71+
"/{proj_id}/tags",
72+
response_model=ImportJobRead,
73+
summary="Starts the import tags job on given project.",
74+
)
75+
def start_import_tags_job(
76+
*,
77+
# Ahmad: Since we're uploading a file we have to use multipart/form-data directly in the router method (see project put)
78+
proj_id: int,
79+
uploaded_file: UploadFile = File(
80+
...,
81+
description=("CSV file of codes that gets uploaded into project"),
82+
),
83+
authz_user: AuthzUser = Depends(),
84+
) -> ImportJobRead:
85+
authz_user.assert_in_project(proj_id)
86+
if not __is_file_csv(uploaded_file=uploaded_file):
87+
raise HTTPException(
88+
status_code=415,
89+
detail="Codes need to be in csv format.",
90+
)
91+
user_id = authz_user.user.id
92+
filename = f"import_tags_{user_id}_{proj_id}.csv"
93+
filepath = repo.get_dst_path_for_temp_file(filename)
94+
filepath = repo.store_uploaded_file(
95+
uploaded_file=uploaded_file, filepath=filepath, fn=filename
96+
)
97+
98+
import_job_params = ImportJobParameters(
99+
proj_id=proj_id,
100+
filename=filename,
101+
user_id=user_id,
102+
import_job_type=ImportJobType.TAGS,
103+
)
104+
return prepare_and_start_import_job_async(import_job_params=import_job_params)
105+
106+
107+
@router.post(
108+
"",
109+
response_model=ImportJobRead,
110+
summary="Starts the import project job on given project",
111+
)
112+
def start_import_project_job(
113+
*,
114+
db: Session = Depends(get_db_session),
115+
uploaded_file: UploadFile = File(
116+
...,
117+
description=("Zip file of project metadata that gets uploaded into project"),
118+
),
119+
current_user: UserRead = Depends(get_current_user),
120+
) -> ImportJobRead:
121+
if not __is_file_zip(uploaded_file=uploaded_file):
122+
raise HTTPException(
123+
status_code=415,
124+
detail="Project need to be in zip format.",
125+
)
126+
user_id = current_user.id
127+
random_temp_project_name = str(uuid.uuid4())
128+
filename = f"import_project_{random_temp_project_name}_for_user_{user_id}.zip"
129+
filepath = repo.get_dst_path_for_temp_file(filename)
130+
filepath = repo.store_uploaded_file(
131+
uploaded_file=uploaded_file, filepath=filepath, fn=filename
132+
)
133+
project_create = ProjectCreate(title=random_temp_project_name, description="")
134+
db_obj = crud_project.create(
135+
db=db, create_dto=project_create, creating_user=current_user
136+
)
137+
138+
import_job_params = ImportJobParameters(
139+
proj_id=db_obj.id,
140+
filename=filename,
141+
user_id=user_id,
142+
import_job_type=ImportJobType.PROJECT,
143+
)
144+
return prepare_and_start_import_job_async(import_job_params=import_job_params)
145+
146+
147+
def __is_file_csv(uploaded_file: UploadFile):
148+
return uploaded_file.content_type == "text/csv"
149+
150+
151+
def __is_file_json(uploaded_file: UploadFile):
152+
return uploaded_file.content_type == "application/json"
153+
154+
155+
def __is_file_zip(uploaded_file: UploadFile):
156+
return uploaded_file.content_type == "application/zip"

backend/src/api/endpoints/source_document.py

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from app.core.data.crud.source_document_metadata import crud_sdoc_meta
2020
from app.core.data.crud.span_annotation import crud_span_anno
2121
from app.core.data.crud.span_group import crud_span_group
22-
from app.core.data.doc_type import DocType
2322
from app.core.data.dto.bbox_annotation import (
2423
BBoxAnnotationRead,
2524
BBoxAnnotationReadResolved,
@@ -91,26 +90,7 @@ def get_by_id_with_data(
9190
crud_sdoc.get_status(db=db, sdoc_id=sdoc_id, raise_error_on_unfinished=True)
9291

9392
sdoc_data = crud_sdoc.read_data(db=db, id=sdoc_id)
94-
if sdoc_data is None:
95-
# if data is none, that means the document is not a text document
96-
# instead of returning html, we return the URL to the image / video / audio file
97-
sdoc = SourceDocumentRead.model_validate(crud_sdoc.read(db=db, id=sdoc_id))
98-
url = RepoService().get_sdoc_url(
99-
sdoc=sdoc,
100-
relative=True,
101-
webp=sdoc.doctype == DocType.image,
102-
thumbnail=False,
103-
)
104-
return SourceDocumentDataRead(
105-
id=sdoc_id,
106-
project_id=sdoc.project_id,
107-
token_character_offsets=[],
108-
tokens=[],
109-
sentences=[],
110-
html=url,
111-
)
112-
else:
113-
return SourceDocumentDataRead.model_validate(sdoc_data)
93+
return SourceDocumentDataRead.model_validate(sdoc_data)
11494

11595

11696
@router.delete(

backend/src/app/celery/background_jobs/__init__.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
from pathlib import Path
22
from typing import Any, List
33

4-
from celery import Task
4+
from celery import Task, group
5+
from celery.result import GroupResult
56

67
from app.core.data.crawler.crawler_service import CrawlerService
78
from app.core.data.dto.crawler_job import CrawlerJobParameters, CrawlerJobRead
89
from app.core.data.dto.export_job import ExportJobParameters, ExportJobRead
10+
from app.core.data.dto.import_job import ImportJobParameters, ImportJobRead
911
from app.core.data.dto.llm_job import LLMJobParameters2, LLMJobRead
1012
from app.core.data.export.export_service import ExportService
13+
from app.core.data.import_.import_service import ImportService
1114
from app.core.data.llm.llm_service import LLMService
1215
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
1316

@@ -53,10 +56,23 @@ def prepare_and_start_export_job_async(
5356

5457
exs: ExportService = ExportService()
5558
ex_job = exs.prepare_export_job(export_params)
59+
print("-----ex id", ex_job.id)
5660
start_export_job.apply_async(kwargs={"export_job": ex_job})
5761
return ex_job
5862

5963

64+
def prepare_and_start_import_job_async(
65+
import_job_params: ImportJobParameters,
66+
) -> ImportJobRead:
67+
from app.celery.background_jobs.tasks import start_import_job
68+
69+
assert isinstance(start_import_job, Task), "Not a Celery Task"
70+
ims: ImportService = ImportService()
71+
ims_job = ims.prepare_import_job(import_job_params)
72+
start_import_job.apply_async(kwargs={"import_job": ims_job})
73+
return ims_job
74+
75+
6076
def prepare_and_start_crawling_job_async(
6177
crawler_params: CrawlerJobParameters,
6278
) -> CrawlerJobRead:
@@ -98,7 +114,7 @@ def prepare_and_start_llm_job_async(
98114

99115
def execute_text_preprocessing_pipeline_apply_async(
100116
cargos: List[PipelineCargo],
101-
) -> None:
117+
) -> GroupResult:
102118
from app.celery.background_jobs.tasks import (
103119
execute_text_preprocessing_pipeline_task,
104120
)
@@ -107,8 +123,10 @@ def execute_text_preprocessing_pipeline_apply_async(
107123
execute_text_preprocessing_pipeline_task, Task
108124
), "Not a Celery Task"
109125

126+
tasks = []
110127
for cargo in cargos:
111-
execute_text_preprocessing_pipeline_task.apply_async(kwargs={"cargo": cargo})
128+
tasks.append(execute_text_preprocessing_pipeline_task.s(cargo=cargo))
129+
return group(tasks).apply_async()
112130

113131

114132
def execute_image_preprocessing_pipeline_apply_async(

0 commit comments

Comments
 (0)