Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix CelerySignalProcessor delete handling #473

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Bragegs
Copy link

@Bragegs Bragegs commented Feb 2, 2024

  1. Removed unecessary comma from setter of bulk_data variable.
  2. Rename registry_delete_task function parameter name data into bulk_data
  3. Add doc_module to registry_delete_task function parameters so that we can init the registered document correctly.
  4. On deletion (1): a list of related instances are now prepared and then we only send the ids to those to the task that should update the related instances. The reason for this change is that if we serialize these instances in the preparation period the instance we were deleting have not been deleted yet (pre-save) and thus if the related instances had data connected to this will-be-deleted instance it would stick instead of being deleted. By serializing the related instances after the instance actually were deleted, the related instances docs would be correctly serialized and updated in registry_delete_related_task.
  5. On deletion(2): Now it prepares the instance being deleted and deletes it in registry_delete_task. It does not handle related instances here as it did before, which is covered by (1).

Fixes 472

1. Removed unecessary comma from setter of `bulk_data` variable.
2. Add `doc_module` to `registry_delete_task` function parameters so that we can init the registered document correctly.
@Teachmetech
Copy link

Teachmetech commented Feb 7, 2024

@Bragegs After trying your changes I get the following error in DigitalOcean, is there a way to avoid this?:

  File "/workspace/.heroku/python/lib/python3.11/site-packages/celery/app/trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
                 ^^^^^^^^^^^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/site-packages/celery/app/trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/api/signals/django_elasticsearch_dsl_custom_celery_signal_handler.py", line 83, in registry_delete_task
    doc_instance._bulk(bulk_data, parallel=parallel)
  File "/workspace/.heroku/python/lib/python3.11/site-packages/django_elasticsearch_dsl/documents.py", line 234, in _bulk
    return self.parallel_bulk(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/site-packages/django_elasticsearch_dsl/documents.py", line 194, in parallel_bulk
    deque(bulk_actions, maxlen=0)
  File "/workspace/.heroku/python/lib/python3.11/site-packages/elasticsearch/helpers/actions.py", line 592, in parallel_bulk
    pool = BlockingPool(thread_count)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/pool.py", line 930, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
                            ^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/queues.py", line 341, in __init__
    self._rlock = ctx.Lock()
                  ^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/workspace/.heroku/python/lib/python3.11/multiprocessing/synchronize.py", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(
                         ^^^^^^^^^^^^^^^^^^^^^^^^^
OSError: [Errno 38] Function not implemented```

@Bragegs
Copy link
Author

Bragegs commented Feb 9, 2024

@Teachmetech
Copy link

@Teachmetech could this be a DigitalOcean issue?

https://stackoverflow.com/a/67347498

https://www.digitalocean.com/community/tutorials/python-multiprocessing-example

Interesting, thanks for that info! Is it possible to workaround this issue by having the deletes go through
the RealTimeSignalProcessor and the rest of the signals go through the celery processor? I have a task that runs nightly that schedules about a few million objects to be updated and the signals slow it down quite a bit. Trying to minimize the time updates take by creating a separate queue/workers specifically to handle the ES signals. Everything is succeeding except for the delete signals. Another option would to be to run ES sync after the updates are completed, however, not sure how to do that without wiping the data, I'm not sure if any of the management commands support that.

Screenshot 2024-02-09 at 2 11 26 AM

This version seems to actually delete all indexes and related docs instances correctly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ModuleNotFoundError: No module named 'MyIndex'
2 participants