Skip to content

Commit

Permalink
Merge branch 'main' into patch-4
Browse files Browse the repository at this point in the history
  • Loading branch information
gsingh935 authored Dec 19, 2024
2 parents ff63e7a + 65b110d commit 1dbad96
Show file tree
Hide file tree
Showing 57 changed files with 2,556 additions and 2,112 deletions.
2 changes: 1 addition & 1 deletion airflow/api_fastapi/core_api/datamodels/dag_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class DAGRunClearBody(BaseModel):
class DAGRunResponse(BaseModel):
"""DAG Run serializer for responses."""

dag_run_id: str | None = Field(validation_alias="run_id")
dag_run_id: str = Field(validation_alias="run_id")
dag_id: str
logical_date: datetime | None
queued_at: datetime | None
Expand Down
4 changes: 1 addition & 3 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7097,9 +7097,7 @@ components:
DAGRunResponse:
properties:
dag_run_id:
anyOf:
- type: string
- type: 'null'
type: string
title: Dag Run Id
dag_id:
type: string
Expand Down
7 changes: 6 additions & 1 deletion airflow/api_fastapi/execution_api/routes/task_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ def ti_update_state(
Not all state transitions are valid, and transitioning to some states requires extra information to be
passed along. (Check out the datamodels for details, the rendered docs might not reflect this accurately)
"""
updated_state: str = ""

# We only use UUID above for validation purposes
ti_id_str = str(task_instance_id)

Expand Down Expand Up @@ -207,6 +209,7 @@ def ti_update_state(
if ti_patch_payload.state == State.FAILED:
# clear the next_method and next_kwargs
query = query.values(next_method=None, next_kwargs=None)
updated_state = State.FAILED
elif isinstance(ti_patch_payload, TIDeferredStatePayload):
# Calculate timeout if it was passed
timeout = None
Expand All @@ -231,6 +234,7 @@ def ti_update_state(
next_kwargs=ti_patch_payload.trigger_kwargs,
trigger_timeout=timeout,
)
updated_state = State.DEFERRED
elif isinstance(ti_patch_payload, TIRescheduleStatePayload):
task_instance = session.get(TI, ti_id_str)
actual_start_date = timezone.utcnow()
Expand All @@ -252,11 +256,12 @@ def ti_update_state(
query = TI.duration_expression_update(ti_patch_payload.end_date, query, session.bind)
# clear the next_method and next_kwargs so that none of the retries pick them up
query = query.values(state=State.UP_FOR_RESCHEDULE, next_method=None, next_kwargs=None)
updated_state = State.UP_FOR_RESCHEDULE
# TODO: Replace this with FastAPI's Custom Exception handling:
# https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers
try:
result = session.execute(query)
log.info("TI %s state updated: %s row(s) affected", ti_id_str, result.rowcount)
log.info("TI %s state updated to %s: %s row(s) affected", ti_id_str, updated_state, result.rowcount)
except SQLAlchemyError as e:
log.error("Error updating Task Instance state: %s", e)
raise HTTPException(
Expand Down
105 changes: 25 additions & 80 deletions airflow/callbacks/callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,49 +16,38 @@
# under the License.
from __future__ import annotations

import json
from typing import TYPE_CHECKING

from pydantic import BaseModel

from airflow.api_fastapi.execution_api.datamodels import taskinstance as ti_datamodel # noqa: TC001
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from airflow.models.taskinstance import SimpleTaskInstance
from airflow.typing_compat import Self


class CallbackRequest:
class CallbackRequest(BaseModel):
"""
Base Class with information about the callback to be executed.
:param full_filepath: File Path to use to run the callback
:param msg: Additional Message that can be used for logging
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
"""

def __init__(
self,
full_filepath: str,
processor_subdir: str | None = None,
msg: str | None = None,
):
self.full_filepath = full_filepath
self.processor_subdir = processor_subdir
self.msg = msg

def __eq__(self, other):
if isinstance(other, self.__class__):
return self.__dict__ == other.__dict__
return NotImplemented

def __repr__(self):
return str(self.__dict__)

def to_json(self) -> str:
return json.dumps(self.__dict__)
full_filepath: str
"""File Path to use to run the callback"""
processor_subdir: str | None = None
"""Directory used by Dag Processor when parsed the dag"""
msg: str | None = None
"""Additional Message that can be used for logging to determine failure/zombie"""

@classmethod
def from_json(cls, json_str: str):
json_object = json.loads(json_str)
return cls(**json_object)
def from_json(cls, data: str | bytes | bytearray) -> Self:
return cls.model_validate_json(data)

def to_json(self, **kwargs) -> str:
return self.model_dump_json(**kwargs)


class TaskCallbackRequest(CallbackRequest):
Expand All @@ -67,25 +56,12 @@ class TaskCallbackRequest(CallbackRequest):
A Class with information about the success/failure TI callback to be executed. Currently, only failure
callbacks (when tasks are externally killed) and Zombies are run via DagFileProcessorProcess.
:param full_filepath: File Path to use to run the callback
:param simple_task_instance: Simplified Task Instance representation
:param msg: Additional Message that can be used for logging to determine failure/zombie
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
:param task_callback_type: e.g. whether on success, on failure, on retry.
"""

def __init__(
self,
full_filepath: str,
simple_task_instance: SimpleTaskInstance,
processor_subdir: str | None = None,
msg: str | None = None,
task_callback_type: TaskInstanceState | None = None,
):
super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
self.simple_task_instance = simple_task_instance
self.task_callback_type = task_callback_type
ti: ti_datamodel.TaskInstance
"""Simplified Task Instance representation"""
task_callback_type: TaskInstanceState | None = None
"""Whether on success, on failure, on retry"""

@property
def is_failure_callback(self) -> bool:
Expand All @@ -98,42 +74,11 @@ def is_failure_callback(self) -> bool:
TaskInstanceState.UPSTREAM_FAILED,
}

def to_json(self) -> str:
from airflow.serialization.serialized_objects import BaseSerialization

val = BaseSerialization.serialize(self.__dict__, strict=True)
return json.dumps(val)

@classmethod
def from_json(cls, json_str: str):
from airflow.serialization.serialized_objects import BaseSerialization

val = json.loads(json_str)
return cls(**BaseSerialization.deserialize(val))


class DagCallbackRequest(CallbackRequest):
"""
A Class with information about the success/failure DAG callback to be executed.
:param full_filepath: File Path to use to run the callback
:param dag_id: DAG ID
:param run_id: Run ID for the DagRun
:param processor_subdir: Directory used by Dag Processor when parsed the dag.
:param is_failure_callback: Flag to determine whether it is a Failure Callback or Success Callback
:param msg: Additional Message that can be used for logging
"""
"""A Class with information about the success/failure DAG callback to be executed."""

def __init__(
self,
full_filepath: str,
dag_id: str,
run_id: str,
processor_subdir: str | None,
is_failure_callback: bool | None = True,
msg: str | None = None,
):
super().__init__(full_filepath=full_filepath, processor_subdir=processor_subdir, msg=msg)
self.dag_id = dag_id
self.run_id = run_id
self.is_failure_callback = is_failure_callback
dag_id: str
run_id: str
is_failure_callback: bool | None = True
"""Flag to determine whether it is a Failure Callback or Success Callback"""
8 changes: 1 addition & 7 deletions airflow/cli/commands/local_commands/dag_processor_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
from __future__ import annotations

import logging
from datetime import timedelta
from typing import Any

from airflow.cli.commands.local_commands.daemon_utils import run_command_with_daemon_option
Expand All @@ -36,11 +35,10 @@
def _create_dag_processor_job_runner(args: Any) -> DagProcessorJobRunner:
"""Create DagFileProcessorProcess instance."""
processor_timeout_seconds: int = conf.getint("core", "dag_file_processor_timeout")
processor_timeout = timedelta(seconds=processor_timeout_seconds)
return DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
processor_timeout=processor_timeout,
processor_timeout=processor_timeout_seconds,
dag_directory=args.subdir,
max_runs=args.num_runs,
),
Expand All @@ -54,10 +52,6 @@ def dag_processor(args):
if not conf.getboolean("scheduler", "standalone_dag_processor"):
raise SystemExit("The option [scheduler/standalone_dag_processor] must be True.")

sql_conn: str = conf.get("database", "sql_alchemy_conn").lower()
if sql_conn.startswith("sqlite"):
raise SystemExit("Standalone DagProcessor is not supported when using sqlite.")

job_runner = _create_dag_processor_job_runner(args)

reload_configuration_for_dag_processing()
Expand Down
Loading

0 comments on commit 1dbad96

Please sign in to comment.