Skip to content

Conversation

@jheld
Copy link
Contributor

@jheld jheld commented Nov 22, 2024

This looks to add elasticsearch as the task event history backend to flower.

Proposed Solution

Note this essentially still the original PR #821 (I don't recall why I closed it)

This is mainly 2 pieces:

  • indexing events into elasticsearch in a somewhat efficient manner (for the process and for elasticsearch)
  • searching/sorting/pagination of task history & task lookup by means of elasticsearch.

Done so far (working)

  • indexing tasks into elasticsearch

    • I have a background thread that buffers up task events based on a queue and sends bulk index requests into elasticsearch
  • Searching (moderate support for different fields), sorting on all fields. The sorting & pagination work but need more QA.

  • Dashboard able to pull from elasticsearch (at startup)

Questions

  • where will this logic live? flower subcommand? flower proper (w/ elasticsearch flag settings) @johnarnold

  • originally I had the indexer outside of flower. It's now in flower, but configured in a hack-off standalone mode, based currently just on argv. There are a few --elasticsearch flags to control the behavior, just for draft/dev mode for now.

  • I am using kombu's LRUCache to cache certain search_after queries for the task history pagination. It is my way around the elasticsearch pagination restrictions, and keeps deep pagination requests super performant in my testing.

  • can we improve the elasticsearch indexing process?

@jheld jheld force-pushed the jheld/elasticsearch_history branch from 789688c to 47cea6f Compare November 23, 2024 02:46
@auvipy auvipy requested review from auvipy and Copilot August 17, 2025 15:32
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds Elasticsearch support to Flower for task event history management. It introduces an alternative backend to store and query task events in Elasticsearch instead of relying solely on in-memory storage.

  • Adds Elasticsearch indexing capabilities through a background thread that buffers and bulk indexes task events
  • Implements search, sorting, and pagination functionality using Elasticsearch queries
  • Introduces new configuration options for Elasticsearch connection, indexing behavior, and data retention

Reviewed Changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
setup.py Adds Elasticsearch dependencies and console script entry points
flower/views/tasks.py Integrates Elasticsearch querying for task views with fallback to in-memory storage
flower/utils/tasks.py Extends task filtering parameters and adds type annotation
flower/utils/search.py Enhances search term parsing with new fields and time-based filtering
flower/urls.py Adds Elasticsearch refresh API endpoint
flower/options.py Defines new Elasticsearch configuration options
flower/logging_utils.py Adds custom logging formatter for Celery exceptions
flower/indexer_app.py Creates dedicated indexer application for Elasticsearch event processing
flower/events.py Adds Elasticsearch dashboard data retrieval functionality
flower/elasticsearch_events.py Core Elasticsearch indexing logic with background threading
flower/command.py Adds new indexer command for standalone Elasticsearch indexing
flower/api/tasks.py Integrates Elasticsearch support in task API endpoints
flower/api/elasticsearch_history.py Implements Elasticsearch-based task history API handlers
flower/init.py Updates version number
flower/indexer.py Entry point for standalone indexer command
examples/tasks.py Adds example chained task
docs/config.rst Documents new Elasticsearch configuration options
.pylintrc Disables too-many-positional-arguments warning

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

task_dict['worker'] = task_dict['worker'].hostname
self.write(dict(draw=draw, data=filtered_tasks,
recordsTotal=total_records,
recordsFiltered=records_filtered)) # bug?
Copy link

Copilot AI Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment '# bug?' suggests uncertainty about the correctness of this code. The recordsFiltered value should accurately reflect the number of filtered records, but the logic may be incorrect when switching between Elasticsearch and in-memory backends.

Copilot uses AI. Check for mistakes.
parsed_search['root_id'] = preprocess_search_value(query_part[len('root_id:'):])
elif query_part.startswith('parent_id:'):
parsed_search['parent_id'] = preprocess_search_value(query_part[len('parent_id:'):])
if parsed_search:
Copy link

Copilot AI Aug 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition checks if parsed_search has any content, but it's set to False after each search term is processed. This logic appears incorrect as it would always evaluate parsed_search as a dict, not a boolean.

Copilot uses AI. Check for mistakes.
@jheld
Copy link
Contributor Author

jheld commented Sep 10, 2025

@auvipy (as a repost from personal account) thank you for the fixes. How far do you think this PR is from being merged? Anything I can do to help? Docs, comments, screenshots, etc.

@auvipy
Copy link
Collaborator

auvipy commented Sep 11, 2025

I need some more time to properly review and test this....



EXTRAS_REQUIRE = {
"elasticsearch": ["elasticsearch>=5.4,<6.4", "elasticsearch_dsl>=5.4,<6.4", "requests>=2.13,<3", ],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not more latest versions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just outdated because it's a very long-lived branch. We should be safe to support whatever celery does.

@auvipy auvipy requested review from ask, auvipy and Copilot and removed request for ask September 11, 2025 06:59
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 8 comments.


Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

Comment on lines +66 to +67
for re_match in [m for m in re.finditer(r"<Task: \w+([.]\w+)*\((?P<task_uuid>\w+(-\w+)+)\) \w+ clock:\d+>", task.children_raw) if m]:
task.children.append(Task(uuid=re_match.group("task_uuid")))
Copy link

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This regex parsing and Task creation logic is fragile and could break if the task representation format changes. Consider using a more robust method to parse task children or storing them in a structured format in Elasticsearch.

Suggested change
for re_match in [m for m in re.finditer(r"<Task: \w+([.]\w+)*\((?P<task_uuid>\w+(-\w+)+)\) \w+ clock:\d+>", task.children_raw) if m]:
task.children.append(Task(uuid=re_match.group("task_uuid")))
# Use structured children if available, else fall back to regex parsing
if isinstance(task.children_raw, list):
# Assume list of UUIDs or dicts with 'uuid'
for child in task.children_raw:
if isinstance(child, dict) and 'uuid' in child:
task.children.append(Task(uuid=child['uuid']))
elif isinstance(child, str):
task.children.append(Task(uuid=child))
elif isinstance(task.children_raw, str):
for re_match in [m for m in re.finditer(r"<Task: \w+([.]\w+)*\((?P<task_uuid>\w+(-\w+)+)\) \w+ clock:\d+>", task.children_raw) if m]:
task.children.append(Task(uuid=re_match.group("task_uuid")))

Copilot uses AI. Check for mistakes.
)
cache_value = sorted_tasks.execute().hits.hits[-1]['sort']
sorted_tasks = es_s.extra(from_=0, size=length, search_after=cache_value).sort(
{sort_by: 'asc' if not sort_order else 'desc'}, {'_uid': 'desc', }).execute().hits
Copy link

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sort order logic is inverted. When sort_order is True (descending), the condition 'asc' if not sort_order else 'desc' evaluates to 'desc', but it should be 'desc' when sort_order is True. This should be 'desc' if sort_order else 'asc'.

Suggested change
{sort_by: 'asc' if not sort_order else 'desc'}, {'_uid': 'desc', }).execute().hits
{sort_by: 'desc' if sort_order else 'asc'}, {'_uid': 'desc', }).execute().hits

Copilot uses AI. Check for mistakes.
Comment on lines +50 to +57
custom_es_setup = True
if custom_es_setup:
app.loader.import_default_modules()
if getattr(app.conf, 'timezone', None):
os.environ['TZ'] = app.conf.timezone
time.tzset()
flower_app = Flower(capp=app, options=options, **settings)

Copy link

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The custom_es_setup flag is hardcoded to True and creates duplicate Flower app initialization. This should be refactored to either remove the flag or make it configurable, and eliminate the duplicate app creation.

Suggested change
custom_es_setup = True
if custom_es_setup:
app.loader.import_default_modules()
if getattr(app.conf, 'timezone', None):
os.environ['TZ'] = app.conf.timezone
time.tzset()
flower_app = Flower(capp=app, options=options, **settings)
app.loader.import_default_modules()
if getattr(app.conf, 'timezone', None):
os.environ['TZ'] = app.conf.timezone
time.tzset()

Copilot uses AI. Check for mistakes.
Comment on lines +277 to +288
uuid=task.uuid,
worker=task.hostname,
info=task.info(),
received=received_time,
started=start_time,
task.name,
task.uuid,
task.hostname,
received_time,
start_time,
succeeded_time,
task.info(),
Copy link

Copilot AI Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logging statement has incorrect syntax. Lines 282-287 use keyword arguments, but lines 288-294 are positional arguments without keywords. This will cause a SyntaxError. Either use all keyword arguments or all positional arguments.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants