diff --git a/requirements.txt b/requirements.txt index 9538698..cf870f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,8 @@ itsdangerous==2.1.2 # via flask jinja2==3.1.2 # via flask markupsafe==2.1.3 # via jinja2 python-dateutil==2.8.2 # via arrow -redis==4.6.0 -rq==1.15.1 +redis==5.2.0 +rq==2.0.0 six==1.16.0 werkzeug==2.3.7 Redis-Sentinel-Url==1.0.1 diff --git a/rq_dashboard/web.py b/rq_dashboard/web.py index 6f79629..d4deb57 100644 --- a/rq_dashboard/web.py +++ b/rq_dashboard/web.py @@ -38,8 +38,6 @@ VERSION as rq_version, Queue, Worker, - pop_connection, - push_connection, requeue_job, ) from rq.exceptions import NoSuchJobError @@ -96,13 +94,11 @@ def push_rq_connection(): raise LookupError("Index exceeds RQ list. Not Permitted.") else: new_instance = current_app.redis_conn - push_connection(new_instance) + current_app.redis_conn = new_instance -@blueprint.teardown_request -def pop_rq_connection(exception=None): - pop_connection() + def jsonify(f): @@ -137,7 +133,7 @@ def serialize_queues(instance_number, queues): order="asc", page="1", ), - failed_job_registry_count=FailedJobRegistry(q.name).count, + failed_job_registry_count=FailedJobRegistry(q.name, connection=q.connection).count, failed_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -147,7 +143,7 @@ def serialize_queues(instance_number, queues): order="asc", page="1", ), - started_job_registry_count=StartedJobRegistry(q.name).count, + started_job_registry_count=StartedJobRegistry(q.name, connection=q.connection).count, started_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -157,7 +153,7 @@ def serialize_queues(instance_number, queues): order="asc", page="1", ), - deferred_job_registry_count=DeferredJobRegistry(q.name).count, + deferred_job_registry_count=DeferredJobRegistry(q.name, connection=q.connection).count, deferred_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -167,7 +163,7 @@ def serialize_queues(instance_number, queues): order="asc", page="1", ), - finished_job_registry_count=FinishedJobRegistry(q.name).count, + finished_job_registry_count=FinishedJobRegistry(q.name, connection=q.connection).count, finished_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -177,7 +173,7 @@ def serialize_queues(instance_number, queues): order="asc", page="1", ), - canceled_job_registry_count=CanceledJobRegistry(q.name).count, + canceled_job_registry_count=CanceledJobRegistry(q.name, connection=q.connection).count, canceled_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -187,7 +183,7 @@ def serialize_queues(instance_number, queues): order="asc", page="1", ), - scheduled_job_registry_count=ScheduledJobRegistry(q.name).count, + scheduled_job_registry_count=ScheduledJobRegistry(q.name, connection=q.connection).count, scheduled_url=url_for( ".jobs_overview", instance_number=instance_number, @@ -208,12 +204,13 @@ def serialize_date(dt): return arrow.get(dt).to("UTC").datetime.isoformat() -def serialize_job(job): +def serialize_job(job: Job): + latest_result = job.latest_result() return dict( id=job.id, created_at=serialize_date(job.created_at), ended_at=serialize_date(job.ended_at), - exc_info=str(job.exc_info) if job.exc_info else None, + exc_info=str(latest_result.exc_string) if latest_result else None, description=job.description, ) @@ -255,24 +252,24 @@ def favicon(): ) -def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page, order): - queue = Queue(queue_name, serializer=config.serializer) +def get_queue_registry_jobs_count(queue_name, registry_name, offset, per_page, order, connection): + queue = Queue(queue_name, serializer=config.serializer, connection=connection) if registry_name != "queued": if per_page >= 0: per_page = offset + (per_page - 1) if registry_name == "failed": - current_queue = FailedJobRegistry(queue_name) + current_queue = FailedJobRegistry(queue_name, connection=connection) elif registry_name == "deferred": - current_queue = DeferredJobRegistry(queue_name) + current_queue = DeferredJobRegistry(queue_name, connection=connection) elif registry_name == "started": - current_queue = StartedJobRegistry(queue_name) + current_queue = StartedJobRegistry(queue_name, connection=connection) elif registry_name == "finished": - current_queue = FinishedJobRegistry(queue_name) + current_queue = FinishedJobRegistry(queue_name, connection=connection) elif registry_name == "scheduled": - current_queue = ScheduledJobRegistry(queue_name) + current_queue = ScheduledJobRegistry(queue_name, connection=connection) elif registry_name == "canceled": - current_queue = CanceledJobRegistry(queue_name) + current_queue = CanceledJobRegistry(queue_name, connection=connection) else: current_queue = queue total_items = current_queue.count @@ -314,7 +311,7 @@ def queues_overview(instance_number): "rq_dashboard/queues.html", current_instance=instance_number, instance_list=escape_format_instance_list(current_app.config.get("RQ_DASHBOARD_REDIS_URL")), - queues=Queue.all(), + queues=Queue.all(connection=current_app.redis_conn), rq_url_prefix=url_for(".queues_overview"), rq_dashboard_version=rq_dashboard_version, rq_version=rq_version, @@ -368,15 +365,15 @@ def workers_overview(instance_number): ) def jobs_overview(instance_number, queue_name, registry_name, per_page, order, page): if queue_name is None: - queue = Queue(serializer=config.serializer) + queue = Queue(serializer=config.serializer, connection=current_app.redis_conn) else: - queue = Queue(queue_name, serializer=config.serializer) + queue = Queue(queue_name, serializer=config.serializer, connection=current_app.redis_conn) r = make_response( render_template( "rq_dashboard/jobs.html", current_instance=instance_number, instance_list=escape_format_instance_list(current_app.config.get("RQ_DASHBOARD_REDIS_URL")), - queues=Queue.all(), + queues=Queue.all(connection=current_app.redis_conn), queue=queue, per_page=per_page, order=order, @@ -398,7 +395,7 @@ def jobs_overview(instance_number, queue_name, registry_name, per_page, order, p @blueprint.route("//view/job/") def job_view(instance_number, job_id): - job = Job.fetch(job_id) + job = Job.fetch(job_id, connection=current_app.redis_conn) r = make_response( render_template( "rq_dashboard/job.html", @@ -423,7 +420,7 @@ def job_view(instance_number, job_id): @jsonify def delete_job_view(job_id, registry=None): try: - job = Job.fetch(job_id) + job = Job.fetch(job_id, connection=current_app.redis_conn) job.delete() except NoSuchJobError: if registry: @@ -497,7 +494,7 @@ def compact_queue(queue_name): @blueprint.route("//data/queues.json") @jsonify def list_queues(instance_number): - queues = serialize_queues(instance_number, sorted(Queue.all())) + queues = serialize_queues(instance_number, sorted(Queue.all(connection=current_app.redis_conn))) return dict(queues=queues) @@ -511,7 +508,7 @@ def list_jobs(instance_number, queue_name, registry_name, per_page, order, page) offset = (current_page - 1) * per_page total_items, jobs = get_queue_registry_jobs_count( - queue_name, registry_name, offset, per_page, order + queue_name, registry_name, offset, per_page, order, current_app.redis_conn ) pages_numbers_in_window = pagination_window(total_items, current_page, per_page) @@ -652,7 +649,7 @@ def serialize_queue_names(worker): version=getattr(worker, "version", ""), python_version=getattr(worker, "python_version", ""), ) - for worker in Worker.all() + for worker in Worker.all(connection=current_app.redis_conn) ), key=lambda w: (w["state"], w["queues"], w["name"]), ) diff --git a/tests/test_basic.py b/tests/test_basic.py index 041cf94..af17340 100644 --- a/tests/test_basic.py +++ b/tests/test_basic.py @@ -3,7 +3,7 @@ import unittest import redis -from rq import Queue, Worker, pop_connection, push_connection +from rq import Queue, Worker from rq_dashboard.cli import make_flask_app from rq_dashboard.web import escape_format_instance_list @@ -24,13 +24,12 @@ def setUp(self): self.app.testing = True self.app.config['RQ_DASHBOARD_REDIS_URL'] = ['redis://127.0.0.1'] self.app.redis_conn = self.get_redis_client() - push_connection(self.get_redis_client()) self.client = self.app.test_client() def tearDown(self): q = Queue(connection=self.app.redis_conn) q.empty() - pop_connection() + def test_dashboard_ok(self): response = self.client.get('/') @@ -89,7 +88,7 @@ def test_registry_jobs_list(self): self.assertIn('jobs', data) def test_worker_python_version_field(self): - w = Worker(['q']) + w = Worker(['q'], connection=self.app.redis_conn) w.register_birth() response = self.client.get('/0/data/workers.json') data = json.loads(response.data.decode('utf8')) @@ -100,7 +99,7 @@ def test_worker_python_version_field(self): w.register_death() def test_worker_version_field(self): - w = Worker(['q']) + w = Worker(['q'], connection=self.app.redis_conn) w.register_birth() response = self.client.get('/0/data/workers.json') data = json.loads(response.data.decode('utf8'))