Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KubernetesJobOperator fails if you launch more than one pod #44994

Open
2 tasks done
osintalex opened this issue Dec 17, 2024 · 2 comments
Open
2 tasks done

KubernetesJobOperator fails if you launch more than one pod #44994

osintalex opened this issue Dec 17, 2024 · 2 comments
Labels
area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:cncf-kubernetes Kubernetes provider related issues

Comments

@osintalex
Copy link

Apache Airflow Provider(s)

cncf-kubernetes

Versions of Apache Airflow Providers

8.4.1

Apache Airflow version

2.10.2

Operating System

Not sure - in GCP cloud composer

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

If you launch a kubernetes job operator, it tries to find the pod after execution. If the job has launched multiple pods, it then fails when trying to log since it can't find more than one pod.

I can pinpoint the place in the source code if you give me a link, I just can't find where it is on github but have identified it locally.

It's when raise AirflowException(f"More than one pod running with labels {label_selector}") gets called.

What you think should happen instead

Should be able to have some flag in the job operator constructor that prevents this behaviour from happening - many k8s jobs will launch more than one pods; or the find_pod logic is smart enough to know you will have more than one pod when your job has a large parallellism count.

How to reproduce

Launch a KubernetesJobOperator with parallelism > 1 and I hit it every time. This seems so basic though that I wonder if I am doing something wrong since I would have expected other people to run into it if that was the case.

I am running indexed jobs with completions equal to parallelism count.

Full config looks like this:

my_task = KubernetesJobOperator(
    backoff_limit=18,
    wait_until_job_complete=True,
    completion_mode="Indexed",
    completions=PARALLELISM,
    parallelism=PARALLELISM,
    ttl_seconds_after_finished=60 * 30,
    reattach_on_restart=False,
    get_logs=False,
    labels={
        "app.kubernetes.io/type": "<my pdb selector>",
    },
    job_poll_interval=60,
    config_file="/home/airflow/composer_kube_config",
    task_id="my_task",
    namespace="composer-user-workloads",
    name="my_task",
    image=IMAGE_NAME,
    cmds=["<my command>"],
    arguments=["<my args"],
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "250m", "memory": "512Mi"},
        limits={"memory": "512Mi"},
    ),
    kubernetes_conn_id="kubernetes_default",
    retries=1,
)

Anything else

I have fixed this issue by setting reattach_on_restart to False, which prevents the labels issue but has side effect of producing a stray pod. I then delete that with a python operator that makes use of the k8s hook:


  def delete_all_pods_from_job():
      k8s_hook = KubernetesHook(
          conn_id="kubernetes_default",
          config_file="/home/airflow/composer_kube_config",
      )
      pod_list: k8s_models.V1PodList = k8s_hook.core_v1_client.list_namespaced_pod(
          namespace="composer-user-workloads", label_selector="mylabel=true"
      )
      print(f"found {len(pod_list.items)} pods to delete")
      for pod in pod_list.items:
          print(f"Deleting pod with name {pod.metadata.name}")
          k8s_hook.core_v1_client.delete_namespaced_pod(name=pod.metadata.name, namespace=pod.metadata.namespace)

  cleanup_pods = PythonOperator(
      task_id="cleanup_pods",
      python_callable=delete_all_pods_from_job,
  )

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@osintalex osintalex added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Dec 17, 2024
Copy link

boring-cyborg bot commented Dec 17, 2024

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the provider:cncf-kubernetes Kubernetes provider related issues label Dec 17, 2024
@osintalex
Copy link
Author

Ah found it in the source!

self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill`

calls into

pod = self.find_pod(pod_request_obj.metadata.namespace, context=context)

which then calls in here

def _build_find_pod_label_selector(self, context: Context | None = None, *, exclude_checked=True) -> str:

this is going to match more than one pods always if you have a parallelism count > 1 from what I can tell

which in turn triggers this error https://github.com/apache/airflow/blob/main/providers/src/airflow/providers/cncf/kubernetes/operators/pod.py#L532

Unless I'm missing something, I think this could be fixed by adding a condition here

where if parallelism is > 1 to skip this section and maybe log that this is happening because your job has more than one pod

@osintalex osintalex changed the title KubernetesJobOperator fails if launch more than one pod KubernetesJobOperator fails if you launch more than one pod Dec 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet provider:cncf-kubernetes Kubernetes provider related issues
Projects
None yet
Development

No branches or pull requests

1 participant