-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcelery_bootsteps.py
33 lines (26 loc) · 1.26 KB
/
celery_bootsteps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
"""Module contains celery bootsteps
which we can do custom actions at different
stages in the worker """
from celery import bootsteps
from queues import get_queue_for_delayed_task_delivery
class DeclareQueueAndExchangeForDelayedTaskDelivery(bootsteps.StartStopStep):
"""
Celery Bootstep to declare the exchange and queues for
"Scheduling celery Task".
"""
# 'bootsteps.StartStopStep': Bootsteps is a technique to add functionality
# to the workers.A bootstep is a custom class
# that defines hooks to do custom actions at different
# stages in the worker.
# The bootstep we have defined, require the Pool bootstep.
# Pool: The current process/eventlet/gevent/thread pool
requires = {"celery.worker.components:Pool"}
def start(self, worker):
queues_for_delayed_task_delivery = get_queue_for_delayed_task_delivery(
destination_queue_name="delayed-add-tasks"
)
with worker.app.pool.acquire() as conn:
queues_for_delayed_task_delivery.destination_queue.bind(conn).declare()
queues_for_delayed_task_delivery.temp_queue_for_delayed_delivery.bind(
conn
).declare()