Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ dependencies = [
"sqlalchemy-utils==0.41.1",
"srsly==2.4.8",
"starlette==0.46.1",
"tenacity>=9.1.2",
"tqdm==4.66.3",
"twisted==22.10.0",
"typesense==0.21.0",
Expand Down
13 changes: 13 additions & 0 deletions backend/src/app/core/data/crud/span_text.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Dict, List, Optional

import tenacity
from psycopg2.errors import UniqueViolation
from sqlalchemy.orm import Session

from app.core.data.crud.crud_base import CRUDBase, UpdateNotAllowed
Expand All @@ -19,15 +21,26 @@ def create(self, db: Session, *, create_dto: SpanTextCreate) -> SpanTextORM:
return super().create(db=db, create_dto=create_dto)
return db_obj

@tenacity.retry(
wait=tenacity.wait_random(),
stop=tenacity.stop_after_attempt(5),
retry=tenacity.retry_if_exception_type(UniqueViolation),
reraise=True,
)
def create_multi(
self, db: Session, *, create_dtos: List[SpanTextCreate]
) -> List[SpanTextORM]:
text_to_create_dto = {create_dto.text: create_dto for create_dto in create_dtos}
unique_create_dtos = list(text_to_create_dto.values())
dtos_to_create = []

# When importing multiple large documents with similar content in parallel, it can happen that
# the unique constraint on the text field is violated due to the non-atomic check for
# unique create DTOs below! (Line 37-44)
# Hence, as a quick-and-dirty fix, we retry the method on UniqueViolation...
text_to_db_obj_map: Dict[str, SpanTextORM] = {}
for unique_create_dto in unique_create_dtos:
# TODO: this is very inefficient. Can't we read all at once?
db_obj = self.read_by_text(db=db, text=unique_create_dto.text)
if db_obj is None:
dtos_to_create.append(unique_create_dto)
Expand Down
11 changes: 8 additions & 3 deletions backend/src/app/core/data/repo/repo_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ def __init__(self, dst_path: Path):


class ErroneousArchiveException(Exception):
def __init__(self, archive_path: Path):
super().__init__(f"Error with Archive {archive_path}")
def __init__(self, archive_path: Path, msg: Optional[str] = None):
super().__init__(
f"Error with Archive {archive_path}{' :' + msg if msg else ''}"
)


class RepoService(metaclass=SingletonMeta):
Expand Down Expand Up @@ -465,7 +467,10 @@ def extract_archive_in_project(

logger.info(f"Extracting archive at {archive_path_in_project} ...")
if not zipfile.is_zipfile(archive_path_in_project):
raise ErroneousArchiveException(archive_path=archive_path_in_project)
raise ErroneousArchiveException(
archive_path=archive_path_in_project,
msg="Not a valid zip file!",
)
logger.info(f"Extracting archive {archive_path_in_project.name} to {dst} ...")
# taken from: https://stackoverflow.com/a/4917469
# flattens the extracted file hierarchy
Expand Down
5 changes: 5 additions & 0 deletions backend/src/app/core/data/repo/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,8 @@ def image_to_base64(image: Image.Image) -> str:
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
return base64.b64encode(buffered.getvalue()).decode("utf-8")


def base64_to_image(base64_string: str) -> Image.Image:
img_data = base64.b64decode(base64_string)
return Image.open(io.BytesIO(img_data)).convert("RGB")
19 changes: 19 additions & 0 deletions backend/src/app/preprocessing/pipeline/model/pipeline_cargo.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, Any, Dict, List

from loguru import logger
from pydantic import BaseModel, ConfigDict, Field, SkipValidation

from app.core.data.dto.preprocessing_job import PreprocessingJobPayloadRead
Expand Down Expand Up @@ -27,3 +28,21 @@ class PipelineCargo(BaseModel):

data: Dict[str, Any] = Field(description="data", default_factory=dict)
model_config = ConfigDict(arbitrary_types_allowed=True)

def _flush_next_steps(self) -> None:
# Gracefully finish the cargo by flushing all next steps to finished steps. This way, the PreProService
# will skip the steps... If you change this or the mechanism in the PPS, be aware ...
try:
fn = self.data["pptd"].filepath.name
except Exception:
fn = None

logger.info(
f"Gracefully finishing preprocessing {'for ' + fn if fn else ''} cargo..."
)
logger.debug(
f"Preemptively flushing all {len(self.next_steps)} next steps{'for ' + fn if fn else ''} to finished steps...."
)
while len(self.next_steps) > 0:
step = self.next_steps.pop(0)
self.finished_steps.append(step)
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ def add_text_init_steps(pipeline: PreprocessingPipeline) -> None:
from app.preprocessing.pipeline.steps.text.init.create_pptd import (
create_pptd,
)
from app.preprocessing.pipeline.steps.text.init.extract_content_in_html_from_pdf_docs import (
extract_content_in_html_from_pdf_docs,
)
from app.preprocessing.pipeline.steps.text.init.extract_content_in_html_from_raw_text_docs import (
extract_content_in_html_from_raw_text_docs,
)
from app.preprocessing.pipeline.steps.text.init.extract_content_in_html_from_word_or_pdf_docs import (
extract_content_in_html_from_word_or_pdf_docs,
from app.preprocessing.pipeline.steps.text.init.extract_content_in_html_from_word_docs import (
extract_content_in_html_from_word_docs,
)

pipeline.register_step(
Expand All @@ -22,7 +25,12 @@ def add_text_init_steps(pipeline: PreprocessingPipeline) -> None:

pipeline.register_step(
required_data=["pptd"],
func=extract_content_in_html_from_word_or_pdf_docs,
func=extract_content_in_html_from_word_docs,
)

pipeline.register_step(
required_data=["pptd"],
func=extract_content_in_html_from_pdf_docs,
)

pipeline.register_step(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import math
from pathlib import Path

import fitz
from app.core.data.repo.repo_service import RepoService
from app.core.data.repo.utils import base64_to_image
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc
from app.preprocessing.preprocessing_service import PreprocessingService
from app.preprocessing.ray_model_service import RayModelService
from config import conf
from loguru import logger

cc = conf.celery

repo = RepoService()
pps = PreprocessingService()
rms = RayModelService()


def __split_large_pdf_into_chunks(
input_doc: Path, max_pages_per_chunk: int = 5
) -> list[Path] | None:
try:
src = fitz.open(str(input_doc)) # type: ignore
total_pages = src.page_count
except Exception as e:
msg = f"Error opening PDF {input_doc.name}: {e}"
logger.error(msg)
raise RuntimeError(msg)

# First, we check if the PDF needs to be split
num_splits = math.ceil(total_pages / max_pages_per_chunk)
if num_splits == 1:
logger.info(f"PDF {input_doc.name} has {total_pages} pages; no split needed.")
src.close()
return None

# If yes, we proceed to split the PDF and save the chunks to disk in the project repo
out_dir = input_doc.parent
logger.info(
f"Splitting PDF {input_doc.name} into {num_splits} chunks of "
f"up to {max_pages_per_chunk} pages each. Output will be saved in {out_dir}."
)

chunks: list[Path] = []
for i in range(num_splits):
start_page = i * max_pages_per_chunk + 1
end_page = min((i + 1) * max_pages_per_chunk, total_pages)
page_range_str = f"{start_page}-{end_page}"
output_fn = out_dir / f"{input_doc.stem}_pages_{page_range_str}.pdf"
try:
# Create a new PDF for the chunk
new_pdf = fitz.open() # type: ignore
new_pdf.insert_pdf(src, from_page=start_page - 1, to_page=end_page - 1)

# Save the chunk to disk
new_pdf.save(str(output_fn))
new_pdf.close()
chunks.append(output_fn)

logger.debug(f"Stored chunk '{output_fn}'")
except Exception as e:
msg = f"Skipping due to error creating chunk {i + 1} for PDF {input_doc.name}: {e}"
logger.error(msg)
src.close()
return chunks


def __extract_content_in_html_from_pdf_docs(
filepath: Path, extract_images: bool = True
) -> tuple[str, list[Path]]:
if not filepath.exists() or filepath.suffix != ".pdf":
logger.error(f"File {filepath} is not a PDF document!")
return "", []

logger.debug(f"Extracting content as HTML from {filepath.name} ...")
pdf_bytes = filepath.read_bytes()
# this will take some time ...
conversion_output = rms.docling_pdf_to_html(pdf_bytes=pdf_bytes)
doc_html = conversion_output.html_content

# store all extracted images in the same directory as the PDF
extracted_images: list[Path] = []
if extract_images:
output_path = filepath.parent
for img_fn, b64_img in conversion_output.base64_images.items():
img_fn = Path(img_fn)
img_path = output_path / (img_fn.stem + ".png")
img = base64_to_image(b64_img)
img.save(img_path, format="PNG")
extracted_images.append(img_path)
logger.debug(f"Saved extracted image {img_path} from PDF {filepath.name}.")

return doc_html, extracted_images


def extract_content_in_html_from_pdf_docs(
cargo: PipelineCargo,
) -> PipelineCargo:
## STRATEGY:
# 0. check if PDF needs to be chunked, i.e., if it has more than N (per default 5) pages.
# YES:
# 1. Chunk the PDF
# 2. stop prepro for cargo (not the whole PPJ!)
# 3. create a new PPJ from the chunks
# NO:
# 1. continue with extracting content as HTML including images from PDF via Docling through RayModelService

## TODO Open Questions:
# - how to properly link the chunks concerning the page order and SDoc links to navigate in the UI?
# - can we maybe have some sort of Parent SDoc (no Adoc!) that links the chunk sdocs?

pptd: PreProTextDoc = cargo.data["pptd"]
filepath = pptd.filepath

if filepath.suffix != ".pdf":
return cargo

# Split large PDFs into chunks if necessary
chunks = __split_large_pdf_into_chunks(
filepath, max_pages_per_chunk=cc.preprocessing.max_pages_per_pdf_chunk
)

if chunks:
# YES -> stop prepro for cargo, start PPJ with all chunks
# (we cannot stop the whole PPJ because it might contain more payloads)
cargo._flush_next_steps()

logger.info(f"Starting new PPJ for {len(chunks)} PDF chunks ...")
ppj = pps.prepare_and_start_preprocessing_job_async(
proj_id=cargo.ppj_payload.project_id,
uploaded_files=None,
archive_file_path=None,
unimported_project_files=chunks,
)
logger.info(
f"Started new PreprocessingJob {ppj.id} for {len(chunks)} PDF chunks."
)

return cargo

# NO -> continue with extracting content as HTML from PDF
html, extracted_images = __extract_content_in_html_from_pdf_docs(
filepath,
extract_images=cc.preprocessing.extract_images_from_pdf,
)

pptd.html = html
pptd.extracted_images = extracted_images

return cargo
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from pathlib import Path
from typing import Dict, List, Tuple
from uuid import uuid4

import mammoth
from app.core.data.repo.repo_service import RepoService
from app.preprocessing.pipeline.model.pipeline_cargo import PipelineCargo
from app.preprocessing.pipeline.model.text.preprotextdoc import PreProTextDoc
from config import conf
from loguru import logger

cc = conf.celery

repo = RepoService()


def __extract_content_in_html_from_word_docs(filepath: Path) -> Tuple[str, List[Path]]:
if filepath.suffix != ".docx" and filepath.suffix != ".doc":
logger.warning(f"File {filepath} is not a Word document!")
return "", []

extracted_images: List[Path] = []

def convert_image(image) -> Dict[str, str]:
if not cc.preprocessing.extract_images_from_docx:
return {"src": ""}

fn = filepath.parent / f"image_{str(uuid4())}"
if "png" in image.content_type:
fn = fn.with_suffix(".png")
elif "jpg" in image.content_type:
fn = fn.with_suffix(".jpg")
elif "jpeg" in image.content_type:
fn = fn.with_suffix(".jpeg")
else:
return {"src": ""}

with image.open() as image_bytes:
with open(fn, "wb") as binary_file:
binary_file.write(image_bytes.read())
extracted_images.append(fn)
return {"src": str(fn.name)}

with open(str(filepath), "rb") as docx_file:
html = mammoth.convert_to_html(
docx_file, convert_image=mammoth.images.img_element(convert_image)
)

return f"<html><body>{html.value}</body></html>", extracted_images


def extract_content_in_html_from_word_docs(
cargo: PipelineCargo,
) -> PipelineCargo:
pptd: PreProTextDoc = cargo.data["pptd"]
filepath = pptd.filepath

if filepath.suffix not in [".docx", ".doc"]:
return cargo

logger.debug(f"Extracting content as HTML from {filepath.name} for ...")

html, extracted_images = __extract_content_in_html_from_word_docs(filepath)
extracted_images = (
extracted_images if cc.preprocessing.extract_images_from_docx else []
)

pptd.html = html
pptd.extracted_images = extracted_images

return cargo
Loading
Loading