Skip to content

Commit

Permalink
Add multiple queues capabilities
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriel-piles committed Sep 19, 2024
1 parent 9205e45 commit 3f3bac4
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 109 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ ENV DEBIAN_FRONTEND noninteractive

RUN apt-get update && \
apt-get -y -q --no-install-recommends install \
git \
wget \
cabextract \
xfonts-utils \
Expand Down
3 changes: 1 addition & 2 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
redis==4.4.0
git+https://github.com/huridocs/queue-processor@bab1f4419b0768df518d06795afd5df2ba0e331c
pydantic==1.10.2
sentry-sdk==1.11.1
PyRSMQ==0.4.5
graypy==2.1.0
fastapi==0.88.0
sentry-sdk[fastapi]==1.11.1
Expand Down
2 changes: 1 addition & 1 deletion src/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ async def upload_document(namespace: str, file: UploadFile):

@app.get("/processed_pdf/{namespace}/{pdf_file_name}", response_class=FileResponse)
async def processed_pdf(
namespace: str, pdf_file_name: str, background_tasks: BackgroundTasks
namespace: str, pdf_file_name: str, background_tasks: BackgroundTasks
):
try:
file_path = f'{CONFIG["processed_pdfs"]}/{namespace}/{pdf_file_name}'
Expand Down
4 changes: 3 additions & 1 deletion src/api/supported_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@ class FileNotSupported(Exception):
def check_file_support(filename: str):
mimetype, _ = mimetypes.guess_type(filename)
if mimetype not in MIMETYPES:
raise FileNotSupported(f"File not supported for file {filename} and mimetype {mimetype}")
raise FileNotSupported(
f"File not supported for file {filename} and mimetype {mimetype}"
)
15 changes: 10 additions & 5 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@
DOCUMENT_SOURCES_PATH = f"{DATA_PATH}/source_documents"
PDF_PROCESSED_PATH = f"{DATA_PATH}/processed_pdfs"
DOCUMENT_FAILED = f"{DATA_PATH}/failed_documents"
GRAYLOG_IP = os.environ.get('GRAYLOG_IP')
GRAYLOG_IP = os.environ.get("GRAYLOG_IP")

REDIS_HOST = os.environ.get("REDIS_HOST", "127.0.0.1")
REDIS_PORT = os.environ.get("REDIS_PORT", "6379")

QUEUES_NAMES = os.environ.get("QUEUES_NAMES", "convert-to-pdf")

CONFIG = {
"source_documents": DOCUMENT_SOURCES_PATH,
Expand All @@ -18,13 +23,13 @@

handlers = [logging.StreamHandler()]
if GRAYLOG_IP:
handlers.append(graypy.GELFUDPHandler(
GRAYLOG_IP, 12201, localname="convert-to-pdf"
))
handlers.append(
graypy.GELFUDPHandler(GRAYLOG_IP, 12201, localname="convert-to-pdf")
)

logging.root.handlers = []
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=handlers
handlers=handlers,
)
123 changes: 42 additions & 81 deletions src/worker/queue_processor.py
Original file line number Diff line number Diff line change
@@ -1,109 +1,69 @@
import logging
import os

import redis
from pydantic import ValidationError
from rsmq.consumer import RedisSMQConsumer
from rsmq import RedisSMQ
from queue_processor.QueueProcessor import QueueProcessor
from sentry_sdk import start_transaction
from sentry_sdk.integrations.redis import RedisIntegration
import sentry_sdk

from src.config import REDIS_HOST, REDIS_PORT, QUEUES_NAMES
from .models import Message, Task
from .convert_to_pdf import convert_to_pdf

SERVICE_NAME = "convert-to-pdf"
RESULTS_QUEUE_NAME = f"{SERVICE_NAME}_results"
TASKS_QUEUE_NAME = f"{SERVICE_NAME}_tasks"
SERVICE_URL = f"{os.environ.get('SERVICE_HOST')}:{os.environ.get('SERVICE_PORT')}"


class QueueProcessor:
def __init__(self):
self.logger = logging.getLogger(__name__)
self.redis_host = os.environ.get("REDIS_HOST", "localhost")
self.redis_port = os.environ.get("REDIS_PORT", "6739")
self.results_queue = RedisSMQ(
host=self.redis_host,
port=self.redis_port,
qname=RESULTS_QUEUE_NAME,
)
self.tasks_queue = RedisSMQ(host=self.redis_host, port=self.redis_port, qname=TASKS_QUEUE_NAME, )

def create_queues(self):
self.logger.info("Creating Redis queues")
def process(message):
logger = logging.getLogger(__name__)
with start_transaction(op="task", name="convert_to_pdf"):
try:
self.results_queue.createQueue().vt(120).exceptions(False).execute()
self.tasks_queue.createQueue().vt(120).exceptions(False).execute()
except Exception:
self.logger.exception("Error creating Redis queues")

def process(self, id, message, rc, ts):
with start_transaction(op="task", name='convert_to_pdf'):
try:
task = Task(**message)
except ValidationError:
self.logger.error(f"Not a valid message: {message}")
return True

self.logger.info(f"Valid message: {message}")

try:
self.logger.info(f"Converting to PDF {task.params.filename}")
processed_pdf_filepath = convert_to_pdf(
task.params.filename, task.params.namespace
)

self.logger.info(f"Converted to PDF {task.params.filename}")
task = Task(**message)
except ValidationError:
logger.error(f"Not a valid message: {message}")
return None

if not processed_pdf_filepath:
message = Message(
namespace=task.params.namespace,
task=task.task,
params=task.params,
success=False,
error_message="Error during pdf convert",
)
self.logger.error(f"Error during pdf convert {task.params.filename}")
logger.info(f"Valid message: {message}")

self.results_queue.sendMessage().message(message.dict()).execute()
self.logger.error(message.json())
return True
try:
logger.info(f"Converting to PDF {task.params.filename}")
processed_pdf_filepath = convert_to_pdf(
task.params.filename, task.params.namespace
)

file_name = "".join(task.params.filename.split(".")[:-1])
processed_pdf_url = (
f"{SERVICE_URL}/processed_pdf/{task.params.namespace}/{file_name}.pdf"
)
logger.info(f"Converted to PDF {task.params.filename}")

if not processed_pdf_filepath:
message = Message(
namespace=task.params.namespace,
task=task.task,
params=task.params,
success=True,
file_url=processed_pdf_url,
success=False,
error_message="Error during pdf convert",
)
logger.error(f"Error during pdf convert {task.params.filename}")
logger.error(message.json())
return message.dict()

self.logger.info(message.json())
self.results_queue.sendMessage(delay=3).message(message.dict()).execute()
return True
except Exception as exception:
self.logger.exception(exception)
return True

def run(self):
try:
redis_smq_consumer = RedisSMQConsumer(
qname=TASKS_QUEUE_NAME,
processor=self.process,
host=self.redis_host,
port=self.redis_port,
file_name = "".join(task.params.filename.split(".")[:-1])
processed_pdf_url = (
f"{SERVICE_URL}/processed_pdf/{task.params.namespace}/{file_name}.pdf"
)
redis_smq_consumer.run()
except redis.exceptions.ConnectionError:
self.logger.exception(
f"Error connecting to Redis: {self.redis_host}:{self.redis_port}"

message = Message(
namespace=task.params.namespace,
task=task.task,
params=task.params,
success=True,
file_url=processed_pdf_url,
)

logger.info(message.json())
return message.dict()
except Exception as exception:
logger.exception(exception)
return None


if __name__ == "__main__":
try:
Expand All @@ -116,6 +76,7 @@ def run(self):
except Exception:
pass

redis_tasks_processor = QueueProcessor()
redis_tasks_processor.create_queues()
redis_tasks_processor.run()
logger = logging.getLogger(__name__)
queues_names = QUEUES_NAMES.split(" ")
queue_processor = QueueProcessor(REDIS_HOST, REDIS_PORT, queues_names, logger)
queue_processor.start(process)
40 changes: 21 additions & 19 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_happy_path(self):
requests.post(f"{service_url}/upload/{namespace}", files=files)

message = self.get_redis_message()
if message.get('error_message'):
if message.get("error_message"):
self.fail(message)
response = requests.get(message["file_url"])

Expand All @@ -34,7 +34,7 @@ def test_happy_path(self):
pdf = PdfReader(io.BytesIO(response.content))
text = pdf.pages[0].extract_text()

self.assertEqual(message['namespace'], 'tenant-name')
self.assertEqual(message["namespace"], "tenant-name")
self.assertIsNotNone(pdf)
self.assertIn("Lorem ipsum", text)

Expand All @@ -50,22 +50,24 @@ def test_errors(self):

self.assertEqual(422, response.status_code)

@parameterized.expand([
["csv"],
["txt"],
["doc"],
["docx"],
["csv"],
["html"],
["odt"],
["epub"],
["xls"],
["xlsx"],
["ods"],
["ppt"],
["pptx"],
["odt"],
])
@parameterized.expand(
[
["csv"],
["txt"],
["doc"],
["docx"],
["csv"],
["html"],
["odt"],
["epub"],
["xls"],
["xlsx"],
["ods"],
["ppt"],
["pptx"],
["odt"],
]
)
def test_allowed_mimetypes(self, extension):
try:
check_file_support(f"filename.{extension}")
Expand All @@ -85,7 +87,7 @@ def get_redis_message():
queue.deleteMessage(id=message["id"]).execute()
return json.loads(message["message"])

return json.loads('{}')
return json.loads("{}")


if __name__ == "__main__":
Expand Down

0 comments on commit 3f3bac4

Please sign in to comment.