From 60a6eeb5464c2bc950b096c8ec048035411bd102 Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Wed, 18 Dec 2024 21:05:39 +0530 Subject: [PATCH 1/8] #45037: Support for additional celery config directly from airflow.cfg file --- chart/values.yaml | 1 + .../src/airflow/providers/celery/executors/default_celery.py | 3 +++ 2 files changed, 4 insertions(+) diff --git a/chart/values.yaml b/chart/values.yaml index adf68c3a194d3..34ba7ae8d855f 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -2607,6 +2607,7 @@ config: celery: flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}' worker_concurrency: 16 + extra_celery_config: '{}' scheduler: standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}' # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0 diff --git a/providers/src/airflow/providers/celery/executors/default_celery.py b/providers/src/airflow/providers/celery/executors/default_celery.py index 75f8cc2bfdf43..c99eb0178c46e 100644 --- a/providers/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/src/airflow/providers/celery/executors/default_celery.py @@ -69,6 +69,8 @@ def _broker_supports_visibility_timeout(url): log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.") result_backend = f'db+{conf.get("database", "SQL_ALCHEMY_CONN")}' +extra_celery_config: dict = conf.getjson("celery", "extra_celery_config", fallback={}) if conf.has_option("celery", "extra_celery_config") else {} + DEFAULT_CELERY_CONFIG = { "accept_content": ["json"], "event_serializer": "json", @@ -85,6 +87,7 @@ def _broker_supports_visibility_timeout(url): ), "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY", fallback=16), "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control", fallback=True), + **extra_celery_config } From 93f5b25c3fa1411ebe1f39e7360c7a2ac45e4a3d Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Wed, 18 Dec 2024 23:22:43 +0530 Subject: [PATCH 2/8] Added unit test for the additional config and addressed comments --- chart/values.yaml | 1 - .../providers/celery/executors/default_celery.py | 2 +- providers/src/airflow/providers/celery/provider.yaml | 7 +++++++ .../tests/celery/executors/test_celery_executor.py | 11 +++++++++++ 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/chart/values.yaml b/chart/values.yaml index 34ba7ae8d855f..adf68c3a194d3 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -2607,7 +2607,6 @@ config: celery: flower_url_prefix: '{{ ternary "" .Values.ingress.flower.path (eq .Values.ingress.flower.path "/") }}' worker_concurrency: 16 - extra_celery_config: '{}' scheduler: standalone_dag_processor: '{{ ternary "True" "False" .Values.dagProcessor.enabled }}' # statsd params included for Airflow 1.10 backward compatibility; moved to [metrics] in 2.0 diff --git a/providers/src/airflow/providers/celery/executors/default_celery.py b/providers/src/airflow/providers/celery/executors/default_celery.py index c99eb0178c46e..224dd2baf04d6 100644 --- a/providers/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/src/airflow/providers/celery/executors/default_celery.py @@ -69,7 +69,7 @@ def _broker_supports_visibility_timeout(url): log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.") result_backend = f'db+{conf.get("database", "SQL_ALCHEMY_CONN")}' -extra_celery_config: dict = conf.getjson("celery", "extra_celery_config", fallback={}) if conf.has_option("celery", "extra_celery_config") else {} +extra_celery_config: dict = conf.getjson("celery", "extra_celery_config", fallback={}) DEFAULT_CELERY_CONFIG = { "accept_content": ["json"], diff --git a/providers/src/airflow/providers/celery/provider.yaml b/providers/src/airflow/providers/celery/provider.yaml index 906a76130456f..54cabeded5fbb 100644 --- a/providers/src/airflow/providers/celery/provider.yaml +++ b/providers/src/airflow/providers/celery/provider.yaml @@ -330,6 +330,13 @@ config: type: string example: ~ default: "False" + extra_celery_config: + description: | + Extra celery configs to include in the celery worker + version_added: ~ + type: string + example: ~ + default: "{}" celery_broker_transport_options: description: | This section is for specifying options which can be passed to the diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 7dc918082b62a..a630cd3e6b99f 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -399,3 +399,14 @@ def test_celery_task_acks_late_loaded_from_string(): # reload celery conf to apply the new config importlib.reload(default_celery) assert default_celery.DEFAULT_CELERY_CONFIG["task_acks_late"] is False + + +@conf_vars({("celery", "extra_celery_config"): '{"worker_max_tasks_per_child": 10}'}) +def test_celery_extra_celery_config_loaded_from_string(): + import importlib + + # reload celery conf to apply the new config + importlib.reload(default_celery) + assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == { + "worker_max_tasks_per_child": 10 + } From c422c6f71094a747597e1091e3f7ed97b4265dfd Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Wed, 18 Dec 2024 23:38:02 +0530 Subject: [PATCH 3/8] Added unit test for the additional config and addressed comments --- .../src/airflow/providers/celery/executors/default_celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/celery/executors/default_celery.py b/providers/src/airflow/providers/celery/executors/default_celery.py index 224dd2baf04d6..263791f89433c 100644 --- a/providers/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/src/airflow/providers/celery/executors/default_celery.py @@ -87,7 +87,7 @@ def _broker_supports_visibility_timeout(url): ), "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY", fallback=16), "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control", fallback=True), - **extra_celery_config + **extra_celery_config, } From c95613da85333e2b1c9ed577bd9334376238b187 Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Sun, 22 Dec 2024 16:42:37 +0530 Subject: [PATCH 4/8] Addressed comments: Added sample config in the docs as well as reference link for all the celery configs. --- providers/src/airflow/providers/celery/provider.yaml | 4 +++- providers/tests/celery/executors/test_celery_executor.py | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/providers/src/airflow/providers/celery/provider.yaml b/providers/src/airflow/providers/celery/provider.yaml index 54cabeded5fbb..56fca3162d902 100644 --- a/providers/src/airflow/providers/celery/provider.yaml +++ b/providers/src/airflow/providers/celery/provider.yaml @@ -332,7 +332,9 @@ config: default: "False" extra_celery_config: description: | - Extra celery configs to include in the celery worker + Extra celery configs to include in the celery worker. Any of the celery config can be added to this config and it will be applied while starting the celery worker. e.g. {"worker_max_tasks_per_child": 10} + See also: + https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration-and-defaults version_added: ~ type: string example: ~ diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index a630cd3e6b99f..ee69a094dc9f4 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -407,6 +407,4 @@ def test_celery_extra_celery_config_loaded_from_string(): # reload celery conf to apply the new config importlib.reload(default_celery) - assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == { - "worker_max_tasks_per_child": 10 - } + assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == {"worker_max_tasks_per_child": 10} From 99bfefbd09a7af441d01fbc485273ceaf0562a7d Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Sun, 22 Dec 2024 16:42:37 +0530 Subject: [PATCH 5/8] Addressed comments: Added sample config in the docs as well as reference link for all the celery configs. --- providers/src/airflow/providers/celery/provider.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/celery/provider.yaml b/providers/src/airflow/providers/celery/provider.yaml index 56fca3162d902..a69dae610a935 100644 --- a/providers/src/airflow/providers/celery/provider.yaml +++ b/providers/src/airflow/providers/celery/provider.yaml @@ -332,7 +332,9 @@ config: default: "False" extra_celery_config: description: | - Extra celery configs to include in the celery worker. Any of the celery config can be added to this config and it will be applied while starting the celery worker. e.g. {"worker_max_tasks_per_child": 10} + Extra celery configs to include in the celery worker. + Any of the celery config can be added to this config and it + will be applied while starting the celery worker. e.g. {"worker_max_tasks_per_child": 10} See also: https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration-and-defaults version_added: ~ From e3de3b606f33d94b0497fb6010de016e0081ec34 Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Mon, 23 Dec 2024 14:30:05 +0530 Subject: [PATCH 6/8] Fixed the Unit tests --- .../src/airflow/providers/celery/executors/default_celery.py | 2 +- providers/src/airflow/providers/celery/provider.yaml | 2 +- providers/tests/celery/executors/test_celery_executor.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/src/airflow/providers/celery/executors/default_celery.py b/providers/src/airflow/providers/celery/executors/default_celery.py index 263791f89433c..616d2c0de9cc7 100644 --- a/providers/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/src/airflow/providers/celery/executors/default_celery.py @@ -87,7 +87,7 @@ def _broker_supports_visibility_timeout(url): ), "worker_concurrency": conf.getint("celery", "WORKER_CONCURRENCY", fallback=16), "worker_enable_remote_control": conf.getboolean("celery", "worker_enable_remote_control", fallback=True), - **extra_celery_config, + **(extra_celery_config if isinstance(extra_celery_config, dict) else {}), } diff --git a/providers/src/airflow/providers/celery/provider.yaml b/providers/src/airflow/providers/celery/provider.yaml index a69dae610a935..5a45989804614 100644 --- a/providers/src/airflow/providers/celery/provider.yaml +++ b/providers/src/airflow/providers/celery/provider.yaml @@ -340,7 +340,7 @@ config: version_added: ~ type: string example: ~ - default: "{}" + default: "{{}}" celery_broker_transport_options: description: | This section is for specifying options which can be passed to the diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index ee69a094dc9f4..080bc9a898d69 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -407,4 +407,4 @@ def test_celery_extra_celery_config_loaded_from_string(): # reload celery conf to apply the new config importlib.reload(default_celery) - assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == {"worker_max_tasks_per_child": 10} + assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == 10 From ad8a922d1c3a0871ec2b347bbfcc23e3b92f82e3 Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Mon, 23 Dec 2024 16:34:08 +0530 Subject: [PATCH 7/8] Fixed the Unit tests --- providers/tests/celery/executors/test_celery_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/tests/celery/executors/test_celery_executor.py b/providers/tests/celery/executors/test_celery_executor.py index 080bc9a898d69..7a33e0cfbc17c 100644 --- a/providers/tests/celery/executors/test_celery_executor.py +++ b/providers/tests/celery/executors/test_celery_executor.py @@ -407,4 +407,4 @@ def test_celery_extra_celery_config_loaded_from_string(): # reload celery conf to apply the new config importlib.reload(default_celery) - assert default_celery.DEFAULT_CELERY_CONFIG["extra_celery_config"] == 10 + assert default_celery.DEFAULT_CELERY_CONFIG["worker_max_tasks_per_child"] == 10 From 13ed443af73d309595808bdb09072e824f896ebe Mon Sep 17 00:00:00 2001 From: Sachin Arora Date: Mon, 23 Dec 2024 18:27:30 +0530 Subject: [PATCH 8/8] Fixed the Unit tests --- .../src/airflow/providers/celery/executors/default_celery.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/src/airflow/providers/celery/executors/default_celery.py b/providers/src/airflow/providers/celery/executors/default_celery.py index 616d2c0de9cc7..20c307a77b04f 100644 --- a/providers/src/airflow/providers/celery/executors/default_celery.py +++ b/providers/src/airflow/providers/celery/executors/default_celery.py @@ -69,7 +69,7 @@ def _broker_supports_visibility_timeout(url): log.debug("Value for celery result_backend not found. Using sql_alchemy_conn with db+ prefix.") result_backend = f'db+{conf.get("database", "SQL_ALCHEMY_CONN")}' -extra_celery_config: dict = conf.getjson("celery", "extra_celery_config", fallback={}) +extra_celery_config = conf.getjson("celery", "extra_celery_config", fallback={}) DEFAULT_CELERY_CONFIG = { "accept_content": ["json"],