From c4b26e0b66da73dd1a0287bb5550085756f5d4a6 Mon Sep 17 00:00:00 2001 From: Ludwig Kristoffersson Date: Sun, 23 Apr 2023 23:15:50 +0200 Subject: [PATCH] fix race conditions in parse image upload pipeline where concurrent jobs would overwrite data --- jobs/pipelines/parse_image_upload/classify_subjects.py | 2 ++ jobs/pipelines/parse_image_upload/create_description.py | 2 ++ jobs/pipelines/parse_image_upload/create_search_queries.py | 2 ++ jobs/pipelines/parse_image_upload/create_title.py | 2 ++ jobs/pipelines/parse_image_upload/parse_image_content.py | 3 +++ 5 files changed, 11 insertions(+) diff --git a/jobs/pipelines/parse_image_upload/classify_subjects.py b/jobs/pipelines/parse_image_upload/classify_subjects.py index 61d70e8f..4f2fb9c5 100644 --- a/jobs/pipelines/parse_image_upload/classify_subjects.py +++ b/jobs/pipelines/parse_image_upload/classify_subjects.py @@ -22,6 +22,7 @@ def job(image_id: str): '''.strip() labels = classifier.classify(text) + upload.refresh() for label in labels: upload.add_subject(label) @@ -30,6 +31,7 @@ def job(image_id: str): upload.save() except Exception as e: + upload.refresh() upload.classify_subjects_ok = False upload.classify_subjects_failure_reason = str(e) upload.save() diff --git a/jobs/pipelines/parse_image_upload/create_description.py b/jobs/pipelines/parse_image_upload/create_description.py index ee438d7b..021785a1 100644 --- a/jobs/pipelines/parse_image_upload/create_description.py +++ b/jobs/pipelines/parse_image_upload/create_description.py @@ -33,6 +33,7 @@ def job(image_id: str, language: str): upload_id=upload.id, ) + upload.refresh() if language == ImageUpload.Language.ENGLISH: upload.description_en = response upload.create_description_en_ok = True @@ -46,6 +47,7 @@ def job(image_id: str, language: str): upload.save() except Exception as e: + upload.refresh() if language == ImageUpload.Language.ENGLISH: upload.create_description_en_ok = False upload.create_description_en_failure_reason = str(e) diff --git a/jobs/pipelines/parse_image_upload/create_search_queries.py b/jobs/pipelines/parse_image_upload/create_search_queries.py index b11f9745..029bdd1e 100644 --- a/jobs/pipelines/parse_image_upload/create_search_queries.py +++ b/jobs/pipelines/parse_image_upload/create_search_queries.py @@ -44,6 +44,7 @@ def job(image_id: str, language: str): logger.info(f'query {language}: {query}') queries.append(query) + upload.refresh() if language == upload.Language.ENGLISH: upload.search_queries_en = json.dumps(queries) upload.create_search_queries_en_ok = True @@ -57,6 +58,7 @@ def job(image_id: str, language: str): else: raise ValueError(f'language {language} is not supported') except Exception as e: + upload.refresh() if language == upload.Language.ENGLISH: upload.create_search_queries_en_ok = False upload.create_search_queries_en_failure_reason = str(e) diff --git a/jobs/pipelines/parse_image_upload/create_title.py b/jobs/pipelines/parse_image_upload/create_title.py index 2a630d95..b4778ea9 100644 --- a/jobs/pipelines/parse_image_upload/create_title.py +++ b/jobs/pipelines/parse_image_upload/create_title.py @@ -27,12 +27,14 @@ def job(image_id: str): upload_id=upload.id, ) + upload.refresh() upload.title = response upload.create_title_ok = True upload.create_title_failure_reason = None upload.save() except Exception as e: + upload.refresh() upload.create_title_ok = False upload.create_title_failure_reason = str(e) upload.save() diff --git a/jobs/pipelines/parse_image_upload/parse_image_content.py b/jobs/pipelines/parse_image_upload/parse_image_content.py index aa35f58e..be991ac5 100644 --- a/jobs/pipelines/parse_image_upload/parse_image_content.py +++ b/jobs/pipelines/parse_image_upload/parse_image_content.py @@ -17,6 +17,8 @@ def job(image_id: str): try: text_content, request = get_text_content(make_img_url(upload)) + + upload.refresh() upload.text_content = text_content upload.parse_image_content_ok = True upload.parse_image_content_failure_reason = None @@ -26,6 +28,7 @@ def job(image_id: str): request.save() except Exception as e: + upload.refresh() upload.parse_image_content_ok = False upload.parse_image_content_failure_reason = str(e) upload.save()