Skip to content

Commit 31d7b0c

Browse files
committed
import audio and video pipeline
1 parent 11095a2 commit 31d7b0c

File tree

11 files changed

+145
-56
lines changed

11 files changed

+145
-56
lines changed

backend/src/app/celery/background_jobs/preprocess.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,25 @@ def execute_image_preprocessing_pipeline_(
4242
pipeline.execute(cargo=cargo)
4343

4444

45-
def execute_audio_preprocessing_pipeline_(cargo: PipelineCargo) -> None:
46-
pipeline = prepro.get_audio_pipeline()
45+
def execute_audio_preprocessing_pipeline_(
46+
cargo: PipelineCargo,
47+
is_init: bool = True,
48+
) -> None:
49+
pipeline = prepro.get_audio_pipeline(is_init=is_init)
4750
logger.debug(
4851
f"Executing audio Preprocessing Pipeline\n\t{pipeline}\n\t for cargo"
4952
f" {cargo.ppj_payload.filename}!"
5053
)
51-
pipeline.execute(cargo=cargo)
54+
pipeline.execute(
55+
cargo=cargo,
56+
)
5257

5358

54-
def execute_video_preprocessing_pipeline_(cargo: PipelineCargo) -> None:
55-
pipeline = prepro.get_video_pipeline()
59+
def execute_video_preprocessing_pipeline_(
60+
cargo: PipelineCargo,
61+
is_init: bool = True,
62+
) -> None:
63+
pipeline = prepro.get_video_pipeline(is_init=is_init)
5664
logger.debug(
5765
f"Executing audio Preprocessing Pipeline\n\t{pipeline}\n\t for cargo"
5866
f" {cargo.ppj_payload.filename}!"

backend/src/app/celery/background_jobs/tasks.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,17 +90,23 @@ def execute_image_preprocessing_pipeline_task(
9090
autoretry_for=(Exception,),
9191
retry_kwargs={"max_retries": 5, "countdown": 5},
9292
)
93-
def execute_audio_preprocessing_pipeline_task(cargo: PipelineCargo) -> None:
94-
execute_audio_preprocessing_pipeline_(cargo=cargo)
93+
def execute_audio_preprocessing_pipeline_task(
94+
cargo: PipelineCargo,
95+
is_init: bool = True,
96+
) -> None:
97+
execute_audio_preprocessing_pipeline_(cargo=cargo, is_init=is_init)
9598

9699

97100
@celery_worker.task(
98101
acks_late=True,
99102
autoretry_for=(Exception,),
100103
retry_kwargs={"max_retries": 5, "countdown": 5},
101104
)
102-
def execute_video_preprocessing_pipeline_task(cargo: PipelineCargo) -> None:
103-
execute_video_preprocessing_pipeline_(cargo=cargo)
105+
def execute_video_preprocessing_pipeline_task(
106+
cargo: PipelineCargo,
107+
is_init: bool = True,
108+
) -> None:
109+
execute_video_preprocessing_pipeline_(cargo=cargo, is_init=is_init)
104110

105111

106112
@celery_worker.task(

backend/src/app/core/data/import_/import_service.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,7 @@ def _import_project(
746746
sdoc_name, mime_type
747747
)
748748
sdoc_doctype = get_doc_type(mime_type)
749+
logger.info(f"Sdoc doctype {sdoc_doctype}")
749750
assert sdoc_doctype, "Expected Doctype to be not None."
750751

751752
# move raw sdocs
@@ -875,8 +876,10 @@ def _import_project(
875876

876877
# 4. init import piplines
877878
from app.celery.background_jobs.tasks import (
879+
execute_audio_preprocessing_pipeline_task,
878880
execute_image_preprocessing_pipeline_task,
879881
execute_text_preprocessing_pipeline_task,
882+
execute_video_preprocessing_pipeline_task,
880883
)
881884

882885
assert isinstance(
@@ -897,6 +900,27 @@ def _import_project(
897900
for cargo in cargos[DocType.image]
898901
]
899902
tasks.extend(image_tasks)
903+
904+
# 6. init audio pipelines
905+
assert isinstance(
906+
execute_audio_preprocessing_pipeline_task, Task
907+
), "Not a Celery Task"
908+
audio_tasks = [
909+
execute_audio_preprocessing_pipeline_task.s(cargo, is_init=False)
910+
for cargo in cargos[DocType.audio]
911+
]
912+
tasks.extend(audio_tasks)
913+
914+
# 7. init video pipelines
915+
assert isinstance(
916+
execute_video_preprocessing_pipeline_task, Task
917+
), "Not a Celery Task"
918+
video_tasks = [
919+
execute_video_preprocessing_pipeline_task.s(cargo, is_init=False)
920+
for cargo in cargos[DocType.video]
921+
]
922+
tasks.extend(video_tasks)
923+
900924
crud_prepro_job.update(
901925
db=db,
902926
uuid=ppj.id,

backend/src/app/preprocessing/pipeline/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ def build_image_pipeline(
330330

331331

332332
@lru_cache(maxsize=1)
333-
def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:
333+
def build_audio_pipeline(is_init: bool = True) -> PreprocessingPipeline:
334334
# we need to import the steps here to avoid loading models at startup
335335
# in the api worker!
336336
from app.preprocessing.pipeline.steps.audio.convert_to_pcm import convert_to_pcm
@@ -481,7 +481,9 @@ def build_audio_pipeline(foo: str = "bar") -> PreprocessingPipeline:
481481

482482

483483
@lru_cache(maxsize=1)
484-
def build_video_pipeline(foo: str = "bar") -> PreprocessingPipeline:
484+
def build_video_pipeline(
485+
is_init: bool = True,
486+
) -> PreprocessingPipeline:
485487
from app.preprocessing.pipeline.steps.audio.convert_to_pcm import convert_to_pcm
486488
from app.preprocessing.pipeline.steps.audio.create_ffmpeg_probe_audio_metadata import (
487489
create_ffmpeg_probe_audio_metadata,

backend/src/app/preprocessing/pipeline/steps/audio/create_ffmpeg_probe_audio_metadata.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
def create_ffmpeg_probe_audio_metadata(cargo: PipelineCargo) -> PipelineCargo:
1919
ppad: PreProAudioDoc = cargo.data["ppad"]
20+
ffmpeg_probe = None
2021
for metadata_key in EXPECTED_METADATA:
2122
if metadata_key not in ppad.metadata:
22-
ffmpeg_probe = ffmpeg.probe(ppad.filepath)
23-
23+
if ffmpeg_probe is None:
24+
ffmpeg_probe = ffmpeg.probe(ppad.filepath)
2425
for k, v in ffmpeg_probe["format"].items():
2526
ppad.metadata[k] = v
2627
return cargo

backend/src/app/preprocessing/pipeline/steps/audio/generate_automatic_transcription.py

Lines changed: 31 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,37 @@
1919

2020
def generate_automatic_transcription(cargo: PipelineCargo) -> PipelineCargo:
2121
ppad: PreProAudioDoc = cargo.data["ppad"]
22-
logger.debug(f"Generating automatic transcription for {ppad.filename} ...")
23-
if ppad.uncompressed_audio_filepath is None:
24-
raise ValueError(
25-
f"Uncompressed audio filepath for {ppad.filename} is None. "
26-
"Please run the 'convert_to_pcm' step first!"
27-
)
28-
29-
# Create Whisper Input
30-
whisper_input = WhisperFilePathInput(
31-
uncompressed_audio_fp=os.path.basename(str(ppad.uncompressed_audio_filepath)),
32-
project_id=ppad.project_id,
33-
)
34-
transcription: WhisperTranscriptionOutput = rms.whisper_transcribe(whisper_input)
35-
36-
# Create Wordlevel Transcriptions
37-
for segment in transcription.segments:
38-
for word in segment.words:
39-
wlt = WordLevelTranscription(
40-
text=word.text,
41-
start_ms=word.start_ms,
42-
end_ms=word.end_ms,
22+
if "word_level_transcriptions" not in ppad.metadata:
23+
logger.debug(f"Generating automatic transcription for {ppad.filename} ...")
24+
if ppad.uncompressed_audio_filepath is None:
25+
raise ValueError(
26+
f"Uncompressed audio filepath for {ppad.filename} is None. "
27+
"Please run the 'convert_to_pcm' step first!"
4328
)
44-
ppad.word_level_transcriptions.append(wlt)
4529

46-
wlt = list(map(lambda wlt: wlt.model_dump(), ppad.word_level_transcriptions))
47-
ppad.metadata["word_level_transcriptions"] = json.dumps(wlt)
30+
# Create Whisper Input
31+
whisper_input = WhisperFilePathInput(
32+
uncompressed_audio_fp=os.path.basename(
33+
str(ppad.uncompressed_audio_filepath)
34+
),
35+
project_id=ppad.project_id,
36+
)
37+
transcription: WhisperTranscriptionOutput = rms.whisper_transcribe(
38+
whisper_input
39+
)
40+
41+
# Create Wordlevel Transcriptions
42+
for segment in transcription.segments:
43+
for word in segment.words:
44+
wlt = WordLevelTranscription(
45+
text=word.text,
46+
start_ms=word.start_ms,
47+
end_ms=word.end_ms,
48+
)
49+
ppad.word_level_transcriptions.append(wlt)
50+
51+
wlt = list(map(lambda wlt: wlt.model_dump(), ppad.word_level_transcriptions))
52+
ppad.metadata["word_level_transcriptions"] = json.dumps(wlt)
53+
else:
54+
logger.info("Import word level transcriptions")
4855
return cargo
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from loguru import logger
2+
from sqlalchemy.orm import Session
3+
4+
from app.core.data.crud.document_tag import crud_document_tag
5+
from app.core.data.orm.source_document import SourceDocumentORM
6+
from app.core.data.repo.repo_service import RepoService
7+
from app.core.db.sql_service import SQLService
8+
from app.preprocessing.pipeline.model.preprodoc_base import PreProDocBase
9+
10+
repo: RepoService = RepoService()
11+
sql: SQLService = SQLService()
12+
13+
14+
def persist_tags(
15+
db: Session, sdoc_db_obj: SourceDocumentORM, ppd: PreProDocBase
16+
) -> None:
17+
logger.info(f"Persisting SourceDocument Tags for {ppd.filename}...")
18+
tags = ppd.tags
19+
if len(tags) > 0:
20+
crud_document_tag.link_multiple_document_tags(
21+
db=db,
22+
sdoc_ids=[sdoc_db_obj.id],
23+
tag_ids=tags,
24+
)

backend/src/app/preprocessing/pipeline/steps/text/write_pptd_to_database.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
from app.core.data.crud.annotation_document import crud_adoc
99
from app.core.data.crud.code import crud_code
10-
from app.core.data.crud.document_tag import crud_document_tag
1110
from app.core.data.crud.project import crud_project
1211
from app.core.data.crud.source_document import crud_sdoc
1312
from app.core.data.crud.source_document_link import crud_sdoc_link
@@ -30,6 +29,7 @@
3029
from app.preprocessing.pipeline.model.text.autospan import AutoSpan
3130
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc
3231
from app.preprocessing.pipeline.steps.common.persist_sdoc_data import persist_sdoc_data
32+
from app.preprocessing.pipeline.steps.common.persist_tags import persist_tags
3333
from app.util.color import get_next_color
3434

3535
repo: RepoService = RepoService()
@@ -88,19 +88,6 @@ def _persist_sdoc_metadata(
8888
crud_sdoc_meta.create_multi(db=db, create_dtos=metadata_create_dtos)
8989

9090

91-
def _persist_tags(
92-
db: Session, sdoc_db_obj: SourceDocumentORM, pptd: PreProTextDoc
93-
) -> None:
94-
logger.info(f"Persisting SourceDocument Tags for {pptd.filename}...")
95-
tags = pptd.tags
96-
if len(tags) > 0:
97-
crud_document_tag.link_multiple_document_tags(
98-
db=db,
99-
sdoc_ids=[sdoc_db_obj.id],
100-
tag_ids=tags,
101-
)
102-
103-
10491
def _persist_sdoc_links(
10592
db: Session, sdoc_db_obj: SourceDocumentORM, pptd: PreProTextDoc
10693
) -> None:
@@ -209,7 +196,7 @@ def write_pptd_to_database(cargo: PipelineCargo) -> PipelineCargo:
209196
_persist_sdoc_metadata(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)
210197

211198
# persist Tags
212-
_persist_tags(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)
199+
persist_tags(db=db, sdoc_db_obj=sdoc_db_obj, ppd=pptd)
213200

214201
# persist SourceDocument Links
215202
_persist_sdoc_links(db=db, sdoc_db_obj=sdoc_db_obj, pptd=pptd)

backend/src/app/preprocessing/pipeline/steps/video/create_ppad_from_video.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
1+
from loguru import logger
2+
13
from app.preprocessing.pipeline.model.audio.preproaudiodoc import PreProAudioDoc
24
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
35
from app.preprocessing.pipeline.model.video.preprovideodoc import PreProVideoDoc
46

7+
POSSIBLE_METADATA = [
8+
"word_level_transcriptions",
9+
"language",
10+
]
11+
512

613
def create_ppad_from_video(cargo: PipelineCargo) -> PipelineCargo:
714
ppvd: PreProVideoDoc = cargo.data["ppvd"]
@@ -14,6 +21,10 @@ def create_ppad_from_video(cargo: PipelineCargo) -> PipelineCargo:
1421
project_id=ppvd.project_id,
1522
mime_type="audio/mpeg",
1623
)
24+
for metadata_key in POSSIBLE_METADATA:
25+
if metadata_key in ppvd.metadata:
26+
logger.info(f"Passing {metadata_key} from video metadata to audio metadata")
27+
ppad.metadata[metadata_key] = ppvd.metadata[metadata_key]
1728

1829
cargo.data["ppad"] = ppad
1930
return cargo

backend/src/app/preprocessing/pipeline/steps/video/create_ppvd.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from loguru import logger
2+
13
from app.core.data.repo.repo_service import RepoService
24
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
35
from app.preprocessing.pipeline.model.video.preprovideodoc import PreProVideoDoc
@@ -11,12 +13,23 @@ def create_ppvd(cargo: PipelineCargo) -> PipelineCargo:
1113
)
1214
if not filepath.exists():
1315
raise FileNotFoundError(f"File {filepath} not found in repository!")
16+
additional_parameters = dict()
17+
if "metadata" in cargo.data:
18+
additional_parameters["metadata"] = cargo.data["metadata"]
19+
if "sdoc_link" in cargo.data:
20+
additional_parameters["sdoc_link_create_dtos"] = cargo.data["sdoc_link"]
21+
if "tags" in cargo.data:
22+
additional_parameters["tags"] = cargo.data["tags"]
23+
logger.info(
24+
f"Adding additional parameters to the create PPVD with {additional_parameters}"
25+
)
1426

1527
ppvd = PreProVideoDoc(
1628
filename=cargo.ppj_payload.filename,
1729
project_id=cargo.ppj_payload.project_id,
1830
mime_type=cargo.ppj_payload.mime_type,
1931
filepath=filepath,
32+
**additional_parameters,
2033
)
2134

2235
cargo.data["ppvd"] = ppvd

0 commit comments

Comments
 (0)