Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compatibility with RQ 2.0.0 #495

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ itsdangerous==2.1.2 # via flask
jinja2==3.1.2 # via flask
markupsafe==2.1.3 # via jinja2
python-dateutil==2.8.2 # via arrow
redis==4.6.0
rq==1.15.1
redis==5.2.0
rq==2.0.0
six==1.16.0
werkzeug==2.3.7
Redis-Sentinel-Url==1.0.1
59 changes: 28 additions & 31 deletions rq_dashboard/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
VERSION as rq_version,
Queue,
Worker,
pop_connection,
push_connection,
requeue_job,
)
from rq.exceptions import NoSuchJobError
Expand Down Expand Up @@ -96,13 +94,11 @@ def push_rq_connection():
raise LookupError("Index exceeds RQ list. Not Permitted.")
else:
new_instance = current_app.redis_conn
push_connection(new_instance)

current_app.redis_conn = new_instance


@blueprint.teardown_request
def pop_rq_connection(exception=None):
pop_connection()



def jsonify(f):
Expand Down Expand Up @@ -137,7 +133,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
failed_job_registry_count=FailedJobRegistry(q.name).count,
failed_job_registry_count=FailedJobRegistry(q.name, connection=q.connection).count,
failed_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -147,7 +143,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
started_job_registry_count=StartedJobRegistry(q.name).count,
started_job_registry_count=StartedJobRegistry(q.name, connection=q.connection).count,
started_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -157,7 +153,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
deferred_job_registry_count=DeferredJobRegistry(q.name).count,
deferred_job_registry_count=DeferredJobRegistry(q.name, connection=q.connection).count,
deferred_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -167,7 +163,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
finished_job_registry_count=FinishedJobRegistry(q.name).count,
finished_job_registry_count=FinishedJobRegistry(q.name, connection=q.connection).count,
finished_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -177,7 +173,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
canceled_job_registry_count=CanceledJobRegistry(q.name).count,
canceled_job_registry_count=CanceledJobRegistry(q.name, connection=q.connection).count,
canceled_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -187,7 +183,7 @@ def serialize_queues(instance_number, queues):
order="asc",
page="1",
),
scheduled_job_registry_count=ScheduledJobRegistry(q.name).count,
scheduled_job_registry_count=ScheduledJobRegistry(q.name, connection=q.connection).count,
scheduled_url=url_for(
".jobs_overview",
instance_number=instance_number,
Expand All @@ -208,12 +204,13 @@ def serialize_date(dt):
return arrow.get(dt).to("UTC").datetime.isoformat()


def serialize_job(job):
def serialize_job(job: Job):
latest_result = job.latest_result()
return dict(
id=job.id,
created_at=serialize_date(job.created_at),
ended_at=serialize_date(job.ended_at),
exc_info=str(job.exc_info) if job.exc_info else None,
exc_info=str(latest_result.exc_string) if latest_result else None,
description=job.description,
)

Expand Down Expand Up @@ -255,24 +252,24 @@ def favicon():
)


def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page, order):
queue = Queue(queue_name, serializer=config.serializer)
def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page, order, connection):
queue = Queue(queue_name, serializer=config.serializer, connection=connection)
if registry_name != "queued":
if per_page >= 0:
per_page = offset + (per_page - 1)

if registry_name == "failed":
current_queue = FailedJobRegistry(queue_name)
current_queue = FailedJobRegistry(queue_name, connection=connection)
elif registry_name == "deferred":
current_queue = DeferredJobRegistry(queue_name)
current_queue = DeferredJobRegistry(queue_name, connection=connection)
elif registry_name == "started":
current_queue = StartedJobRegistry(queue_name)
current_queue = StartedJobRegistry(queue_name, connection=connection)
elif registry_name == "finished":
current_queue = FinishedJobRegistry(queue_name)
current_queue = FinishedJobRegistry(queue_name, connection=connection)
elif registry_name == "scheduled":
current_queue = ScheduledJobRegistry(queue_name)
current_queue = ScheduledJobRegistry(queue_name, connection=connection)
elif registry_name == "canceled":
current_queue = CanceledJobRegistry(queue_name)
current_queue = CanceledJobRegistry(queue_name, connection=connection)
else:
current_queue = queue
total_items = current_queue.count
Expand Down Expand Up @@ -314,7 +311,7 @@ def queues_overview(instance_number):
"rq_dashboard/queues.html",
current_instance=instance_number,
instance_list=escape_format_instance_list(current_app.config.get("RQ_DASHBOARD_REDIS_URL")),
queues=Queue.all(),
queues=Queue.all(connection=current_app.redis_conn),
rq_url_prefix=url_for(".queues_overview"),
rq_dashboard_version=rq_dashboard_version,
rq_version=rq_version,
Expand Down Expand Up @@ -368,15 +365,15 @@ def workers_overview(instance_number):
)
def jobs_overview(instance_number, queue_name, registry_name, per_page, order, page):
if queue_name is None:
queue = Queue(serializer=config.serializer)
queue = Queue(serializer=config.serializer, connection=current_app.redis_conn)
else:
queue = Queue(queue_name, serializer=config.serializer)
queue = Queue(queue_name, serializer=config.serializer, connection=current_app.redis_conn)
r = make_response(
render_template(
"rq_dashboard/jobs.html",
current_instance=instance_number,
instance_list=escape_format_instance_list(current_app.config.get("RQ_DASHBOARD_REDIS_URL")),
queues=Queue.all(),
queues=Queue.all(connection=current_app.redis_conn),
queue=queue,
per_page=per_page,
order=order,
Expand All @@ -398,7 +395,7 @@ def jobs_overview(instance_number, queue_name, registry_name, per_page, order, p

@blueprint.route("/<int:instance_number>/view/job/<job_id>")
def job_view(instance_number, job_id):
job = Job.fetch(job_id)
job = Job.fetch(job_id, connection=current_app.redis_conn)
r = make_response(
render_template(
"rq_dashboard/job.html",
Expand All @@ -423,7 +420,7 @@ def job_view(instance_number, job_id):
@jsonify
def delete_job_view(job_id, registry=None):
try:
job = Job.fetch(job_id)
job = Job.fetch(job_id, connection=current_app.redis_conn)
job.delete()
except NoSuchJobError:
if registry:
Expand Down Expand Up @@ -497,7 +494,7 @@ def compact_queue(queue_name):
@blueprint.route("/<int:instance_number>/data/queues.json")
@jsonify
def list_queues(instance_number):
queues = serialize_queues(instance_number, sorted(Queue.all()))
queues = serialize_queues(instance_number, sorted(Queue.all(connection=current_app.redis_conn)))
return dict(queues=queues)


Expand All @@ -511,7 +508,7 @@ def list_jobs(instance_number, queue_name, registry_name, per_page, order, page)
offset = (current_page - 1) * per_page

total_items, jobs = get_queue_registry_jobs_count(
queue_name, registry_name, offset, per_page, order
queue_name, registry_name, offset, per_page, order, current_app.redis_conn
)

pages_numbers_in_window = pagination_window(total_items, current_page, per_page)
Expand Down Expand Up @@ -652,7 +649,7 @@ def serialize_queue_names(worker):
version=getattr(worker, "version", ""),
python_version=getattr(worker, "python_version", ""),
)
for worker in Worker.all()
for worker in Worker.all(connection=current_app.redis_conn)
),
key=lambda w: (w["state"], w["queues"], w["name"]),
)
Expand Down
9 changes: 4 additions & 5 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest

import redis
from rq import Queue, Worker, pop_connection, push_connection
from rq import Queue, Worker

from rq_dashboard.cli import make_flask_app
from rq_dashboard.web import escape_format_instance_list
Expand All @@ -24,13 +24,12 @@ def setUp(self):
self.app.testing = True
self.app.config['RQ_DASHBOARD_REDIS_URL'] = ['redis://127.0.0.1']
self.app.redis_conn = self.get_redis_client()
push_connection(self.get_redis_client())
self.client = self.app.test_client()

def tearDown(self):
q = Queue(connection=self.app.redis_conn)
q.empty()
pop_connection()


def test_dashboard_ok(self):
response = self.client.get('/')
Expand Down Expand Up @@ -89,7 +88,7 @@ def test_registry_jobs_list(self):
self.assertIn('jobs', data)

def test_worker_python_version_field(self):
w = Worker(['q'])
w = Worker(['q'], connection=self.app.redis_conn)
w.register_birth()
response = self.client.get('/0/data/workers.json')
data = json.loads(response.data.decode('utf8'))
Expand All @@ -100,7 +99,7 @@ def test_worker_python_version_field(self):
w.register_death()

def test_worker_version_field(self):
w = Worker(['q'])
w = Worker(['q'], connection=self.app.redis_conn)
w.register_birth()
response = self.client.get('/0/data/workers.json')
data = json.loads(response.data.decode('utf8'))
Expand Down
Loading