Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/backend_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ jobs:
docker compose -f compose.ray.yml build
docker compose -f compose.ray.yml up --wait
fi
COMPOSE_PROFILES="weaviate,background,backend" docker compose build
COMPOSE_PROFILES="weaviate,background" docker compose up --wait --quiet-pull
COMPOSE_PROFILES="weaviate,background,rq,backend" docker compose build
COMPOSE_PROFILES="weaviate,background,rq" docker compose up --wait --quiet-pull
- name: Check 1 - pytest runs without errors
working-directory: docker
run: |
Expand Down Expand Up @@ -89,7 +89,7 @@ jobs:
working-directory: docker
run: |
source .env
COMPOSE_PROFILES="weaviate,background,backend" docker compose up --wait --quiet-pull
COMPOSE_PROFILES="weaviate,background,rq,backend" docker compose up --wait --quiet-pull
- name: Check 6 - Test End-2-End importer script
working-directory: backend
env:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/frontend_checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
env:
API_WORKERS: 1
VITE_APP_SERVER: http://localhost:13120
COMPOSE_PROFILES: "weaviate,background,backend,frontend"
COMPOSE_PROFILES: "weaviate,background,rq,backend,frontend"
BACKEND_HAS_CHANGED: false
RAY_HAS_CHANGED: false
steps:
Expand Down
16 changes: 16 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@
"cwd": "${workspaceFolder}/backend",
"envFile": "${workspaceFolder}/backend/.env",
"env": {
"BACKEND_TYPE": "api",
"PYTHONPATH": "${workspaceFolder}/backend/src"
}
},
{
"name": "rq",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/backend/src/worker.py",
"args": ["work"],
"console": "integratedTerminal",
"justMyCode": true,
"cwd": "${workspaceFolder}/backend",
"envFile": "${workspaceFolder}/backend/.env",
"env": {
"BACKEND_TYPE": "worker",
"PYTHONPATH": "${workspaceFolder}/backend/src"
}
},
Expand Down
2 changes: 1 addition & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"python.analysis.autoFormatStrings": true,
"python.analysis.typeCheckingMode": "basic",
"python.analysis.pyrightVersion": "1.1.385", // this has to match pyproject.toml
"python.analysis.exclude": ["**/node_modules", "**/__pycache__", ".git", "./backend/.venv", "./docker"],
"python.analysis.exclude": ["**/node_modules", "**/__pycache__", ".git", "**/.venv", "**/docker"],

// pytest
"python.testing.unittestEnabled": false,
Expand Down
1 change: 1 addition & 0 deletions backend/configs/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ redis:
host: localhost
port: ${oc.env:REDIS_PORT, 13124}
password: ${oc.env:REDIS_PASSWORD, dats123}
rq_idx: 10
clients:
export: 2
crawler: 3
Expand Down
1 change: 1 addition & 0 deletions backend/configs/production.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ redis:
host: redis
port: 6379
password: ${oc.env:REDIS_PASSWORD, dats123}
rq_idx: 10
clients:
export: 2
crawler: 3
Expand Down
1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ dependencies = [
"readability-lxml==0.8.1",
"redis==4.3",
"rope==1.9.0",
"rq>=2.4.1",
"scikit-learn==1.3.2",
"scrapy==2.10.0",
"scrapy-playwright==0.0.31",
Expand Down
2 changes: 2 additions & 0 deletions backend/src/backend_api_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
set -e
source .venv/bin/activate

export BACKEND_TYPE="api"

LOG_LEVEL=${LOG_LEVEL:-debug}
API_PORT=${API_PORT:-5500}
API_WORKERS=${API_WORKERS:-10}
Expand Down
1 change: 1 addition & 0 deletions backend/src/celery_background_jobs_worker_entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ source .venv/bin/activate

export OMP_NUM_THREADS=1
export MKL_NUM_THREADS=1
export BACKEND_TYPE="worker"

LOG_LEVEL=${LOG_LEVEL:-debug}
CELERY_BACKGROUND_JOBS_WORKER_CONCURRENCY=${CELERY_BACKGROUND_JOBS_WORKER_CONCURRENCY:-1}
Expand Down
12 changes: 9 additions & 3 deletions backend/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
SourceDocumentNotFoundInFilesystemError,
)

# import all endpoints dynamically
endpoint_modules = import_by_suffix("_endpoint.py")
# import all jobs dynamically
import_by_suffix("_jobs.py")


# custom method to generate OpenApi function names
Expand Down Expand Up @@ -223,7 +223,13 @@ def invalid_error_handler(_, exc: InvalidError):
return PlainTextResponse(str(exc), status_code=HTTPStatus.BAD_REQUEST)


# register all endpoints dynamically
# import & register all exception handlers dynamically
exception_hanlders = import_by_suffix("_exception_handler.py")
for eh in exception_hanlders:
eh.register_exception_handlers(app)

# import & register all endpoints dynamically
endpoint_modules = import_by_suffix("_endpoint.py")
endpoint_modules.sort(key=lambda x: x.__name__.split(".")[-1])
for em in endpoint_modules:
app.include_router(em.router)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""add indices to wordfrequency

Revision ID: 6f484bc7d629
Revises: 7c617b019b45
Create Date: 2025-07-30 15:26:13.260886

"""

from typing import Sequence

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "6f484bc7d629"
down_revision: str | None = "7c617b019b45"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_index(
op.f("ix_wordfrequency_count"), "wordfrequency", ["count"], unique=False
)
op.create_index(
op.f("ix_wordfrequency_sdoc_id"), "wordfrequency", ["sdoc_id"], unique=False
)
op.create_index(
op.f("ix_wordfrequency_word"), "wordfrequency", ["word"], unique=False
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f("ix_wordfrequency_word"), table_name="wordfrequency")
op.drop_index(op.f("ix_wordfrequency_sdoc_id"), table_name="wordfrequency")
op.drop_index(op.f("ix_wordfrequency_count"), table_name="wordfrequency")
# ### end Alembic commands ###
17 changes: 0 additions & 17 deletions backend/src/modules/analysis/analysis_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
compute_num_sdocs_with_date_metadata,
)
from modules.analysis.document_sampler import document_sampler_by_tags
from modules.analysis.duplicate_finder import find_duplicates
from sqlalchemy.orm import Session

router = APIRouter(
Expand Down Expand Up @@ -99,19 +98,3 @@ def sample_sdocs_by_tags(
return document_sampler_by_tags(
project_id=project_id, tag_ids=tag_groups, n=n, frac=frac
)


@router.post(
"/{proj_id}/find_duplicate_text_sdocs",
response_model=list[list[int]],
summary="Returns groups of duplicate sdoc ids.",
)
def find_duplicate_text_sdocs(
*,
db: Session = Depends(get_db_session),
proj_id: int,
max_different_words: int,
authz_user: AuthzUser = Depends(),
) -> list[list[int]]:
authz_user.assert_in_project(proj_id)
return find_duplicates(project_id=proj_id, max_different_words=max_different_words)
15 changes: 15 additions & 0 deletions backend/src/modules/duplicate_finder/duplicate_finder_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from pydantic import BaseModel, Field
from systems.job_system.job_dto import JobInputBase


class DuplicateFinderInput(JobInputBase):
project_id: int = Field(..., description="Project ID to search for duplicates")
max_different_words: int = Field(
..., description="Number of different words allowed between duplicates"
)


class DuplicateFinderOutput(BaseModel):
duplicates: list[list[int]] = Field(
..., description="List of found duplicate clusters"
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,44 @@
import numpy as np
from common.doc_type import DocType
from loguru import logger
from modules.duplicate_finder.duplicate_finder_dto import (
DuplicateFinderInput,
DuplicateFinderOutput,
)
from modules.word_frequency.word_frequency_crud import crud_word_frequency
from repos.db.sql_repo import SQLRepo
from rq import get_current_job
from scipy import sparse
from sklearn.metrics.pairwise import manhattan_distances
from systems.job_system.job_dto import EndpointGeneration, JobPriority
from systems.job_system.job_register_decorator import register_job


def find_duplicates(project_id: int, max_different_words: int) -> list[list[int]]:
@register_job(
job_type="duplicate_finder",
input_type=DuplicateFinderInput,
output_type=DuplicateFinderOutput,
priority=JobPriority.DEFAULT,
generate_endpoints=EndpointGeneration.MINIMAL,
)
def find_duplicates_job(
payload: DuplicateFinderInput,
) -> DuplicateFinderOutput:
job = get_current_job()
assert job is not None, "Job must be running in a worker context"

job.meta["status_message"] = "Started duplicate finding"
job.save_meta()

logger.info("Finding duplicate text sdocs")
t0 = time.time()
with SQLRepo().db_session() as db:
result = crud_word_frequency.read_by_project_and_doctype(
db, project_id=project_id, doctype=DocType.text
db, project_id=payload.project_id, doctype=DocType.text
)
t1 = time.time()
job.meta["status_message"] = "Fetched word frequencies from database"
job.save_meta()
logger.info(f"query took: {t1 - t0}")

t0 = time.time()
Expand Down Expand Up @@ -51,6 +75,8 @@ def find_duplicates(project_id: int, max_different_words: int) -> list[list[int]
(values, (index, indices)), shape=(len(idx2sdoc_id), vocab_size)
)
t1 = time.time()
job.meta["status_message"] = "Created document word vectors"
job.save_meta()
logger.info(f"document vector creation took: {t1 - t0}")
logger.info(f"vocab size: {vocab_size}")
logger.info(f"document_vectors shape: {document_vectors.shape}")
Expand All @@ -59,6 +85,8 @@ def find_duplicates(project_id: int, max_different_words: int) -> list[list[int]
t0 = time.time()
word_dists = manhattan_distances(document_vectors, document_vectors)
t1 = time.time()
job.meta["status_message"] = "Computed distances between documents"
job.save_meta()
logger.info(f"manhatten distance took: {t1 - t0}")

# mask out self distances and one half of the matrix
Expand All @@ -69,9 +97,14 @@ def find_duplicates(project_id: int, max_different_words: int) -> list[list[int]
# find duplicates
t0 = time.time()
duplicate_pairs = np.transpose(
np.where((masked_word_dists <= max_different_words) & (masked_word_dists >= 0))
np.where(
(masked_word_dists <= payload.max_different_words)
& (masked_word_dists >= 0)
)
).tolist()
t1 = time.time()
job.meta["status_message"] = "Identified duplicate pairs"
job.save_meta()
logger.info(f"finding duplicates took: {t1 - t0}")

# map back to sdoc_ids
Expand All @@ -87,6 +120,8 @@ def find_duplicates(project_id: int, max_different_words: int) -> list[list[int]
G.to_undirected()
subgraph_nodes = [list(subgraph) for subgraph in nx.connected_components(G)]
t1 = time.time()
job.meta["status_message"] = "Finished finding duplicates!"
job.save_meta()
logger.info(f"graph grouping took: {t1 - t0}")

return subgraph_nodes
return DuplicateFinderOutput(duplicates=subgraph_nodes)
9 changes: 6 additions & 3 deletions backend/src/modules/word_frequency/word_frequency_orm.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@

class WordFrequencyORM(ORMBase):
sdoc_id: Mapped[int] = mapped_column(
Integer, ForeignKey("sourcedocument.id", ondelete="CASCADE"), primary_key=True
Integer,
ForeignKey("sourcedocument.id", ondelete="CASCADE"),
primary_key=True,
index=True,
)
source_document: Mapped["SourceDocumentORM"] = relationship(
"SourceDocumentORM", back_populates="word_frequencies"
)

word: Mapped[str] = mapped_column(String, primary_key=True)
count: Mapped[int] = mapped_column(Integer)
word: Mapped[str] = mapped_column(String, primary_key=True, index=True)
count: Mapped[int] = mapped_column(Integer, index=True)

def get_project_id(self) -> int:
return self.source_document.get_project_id()
8 changes: 8 additions & 0 deletions backend/src/rq_worker_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

set -e
source .venv/bin/activate

export BACKEND_TYPE="worker"

python src/worker.py work
59 changes: 59 additions & 0 deletions backend/src/systems/job_system/job_dto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import enum
from typing import Generic, TypeVar

import rq
from pydantic import BaseModel, Field


# See https://python-rq.org/docs/jobs/
class JobStatus(str, enum.Enum):
QUEUED = "queued"
FINISHED = "finished"
FAILED = "failed"
STARTED = "started"
DEFERRED = "deferred"
SCHEDULED = "scheduled"
STOPPED = "stopped"
CANCELED = "canceled"


class JobPriority(str, enum.Enum):
LOW = "low"
DEFAULT = "default"
HIGH = "high"


class EndpointGeneration(str, enum.Enum):
ALL = "all" # Generate all endpoints
MINIMAL = "minimal" # Generate start and get_by_id endpoints
NONE = "none" # Do not generate any endpoints


class JobInputBase(BaseModel):
project_id: int = Field(..., description="Project ID associated with the job")


InputT = TypeVar("InputT", bound=JobInputBase)
OutputT = TypeVar("OutputT", bound=BaseModel)


class JobRead(BaseModel, Generic[InputT, OutputT]):
job_id: str = Field(..., description="RQ job ID")
job_type: str = Field(..., description="Type of the job")
project_id: int = Field(..., description="Project ID associated with the job")
status: JobStatus = Field(..., description="Current status of the job")
status_message: str | None = Field(None, description="Status message")
input: InputT = Field(..., description="Input for the job")
output: OutputT | None = Field(None, description="Output for the job")

@staticmethod
def from_rq_job(job: rq.job.Job) -> "JobRead[InputT, OutputT]":
return JobRead(
job_id=job.id,
job_type=job.meta.get("type", "unknown"),
project_id=job.meta.get("project_id", 0),
status=JobStatus(job.get_status()),
status_message=job.meta.get("status_message", ""),
input=job.kwargs["payload"],
output=job.return_value(),
)
Loading