From d28893413149085cc8a9278378619c39fda5efc0 Mon Sep 17 00:00:00 2001 From: Sneha Prabhu Date: Thu, 19 Dec 2024 03:58:37 +0530 Subject: [PATCH] Add dry run for backfill --- .../core_api/datamodels/backfills.py | 13 ++++ .../core_api/openapi/v1-generated.yaml | 32 +++++++- .../core_api/routes/public/backfills.py | 75 +++++++++++++++---- airflow/ui/openapi-gen/queries/queries.ts | 2 +- .../ui/openapi-gen/requests/schemas.gen.ts | 36 +++++++++ .../ui/openapi-gen/requests/services.gen.ts | 2 +- airflow/ui/openapi-gen/requests/types.gen.ts | 19 ++++- .../core_api/routes/public/test_backfills.py | 1 + 8 files changed, 161 insertions(+), 19 deletions(-) diff --git a/airflow/api_fastapi/core_api/datamodels/backfills.py b/airflow/api_fastapi/core_api/datamodels/backfills.py index be04063907a9d..c5f6bd86ea242 100644 --- a/airflow/api_fastapi/core_api/datamodels/backfills.py +++ b/airflow/api_fastapi/core_api/datamodels/backfills.py @@ -33,6 +33,7 @@ class BackfillPostBody(BaseModel): dag_run_conf: dict = {} reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE max_active_runs: int = 10 + dry_run: bool = False class BackfillResponse(BaseModel): @@ -56,3 +57,15 @@ class BackfillCollectionResponse(BaseModel): backfills: list[BackfillResponse] total_entries: int + + +class BackfillRunInfo(BaseModel): + """Data model for run information during a backfill operation.""" + + logical_date: datetime + + +class BackfillDryRunResponse(BaseModel): + """Serializer for responses in dry-run mode for backfill operations.""" + + run_info_list: list[BackfillRunInfo] diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 980b045e34d06..bdc6ae842bc60 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -1143,7 +1143,10 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BackfillResponse' + anyOf: + - $ref: '#/components/schemas/BackfillResponse' + - $ref: '#/components/schemas/BackfillDryRunResponse' + title: Response Create Backfill '401': content: application/json: @@ -6210,6 +6213,18 @@ components: - total_entries title: BackfillCollectionResponse description: Backfill Collection serializer for responses. + BackfillDryRunResponse: + properties: + run_info_list: + items: + $ref: '#/components/schemas/BackfillRunInfo' + type: array + title: Run Info List + type: object + required: + - run_info_list + title: BackfillDryRunResponse + description: Serializer for responses in dry-run mode for backfill operations. BackfillPostBody: properties: dag_id: @@ -6238,6 +6253,10 @@ components: type: integer title: Max Active Runs default: 10 + dry_run: + type: boolean + title: Dry Run + default: false type: object required: - dag_id @@ -6301,6 +6320,17 @@ components: - updated_at title: BackfillResponse description: Base serializer for Backfill. + BackfillRunInfo: + properties: + logical_date: + type: string + format: date-time + title: Logical Date + type: object + required: + - logical_date + title: BackfillRunInfo + description: Data model for run information during a backfill operation. BaseInfoResponse: properties: status: diff --git a/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow/api_fastapi/core_api/routes/public/backfills.py index 61d25597cdd15..4d8a0b1d33daf 100644 --- a/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -19,7 +19,7 @@ from typing import Annotated from fastapi import Depends, HTTPException, status -from sqlalchemy import select, update +from sqlalchemy import desc, select, update from airflow.api_fastapi.common.db.common import ( AsyncSessionDep, @@ -30,8 +30,10 @@ from airflow.api_fastapi.common.router import AirflowRouter from airflow.api_fastapi.core_api.datamodels.backfills import ( BackfillCollectionResponse, + BackfillDryRunResponse, BackfillPostBody, BackfillResponse, + BackfillRunInfo, ) from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, @@ -41,9 +43,14 @@ AlreadyRunningBackfill, Backfill, BackfillDagRun, + BackfillDagRunExceptionReason, + ReprocessBehavior, _create_backfill, + _get_info_list, ) +from airflow.models.serialized_dag import SerializedDagModel from airflow.utils import timezone +from airflow.utils.sqlalchemy import nulls_first from airflow.utils.state import DagRunState backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") @@ -187,22 +194,62 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse: ) def create_backfill( backfill_request: BackfillPostBody, -) -> BackfillResponse: + session: SessionDep, +) -> BackfillResponse | BackfillDryRunResponse: from_date = timezone.coerce_datetime(backfill_request.from_date) to_date = timezone.coerce_datetime(backfill_request.to_date) - try: - backfill_obj = _create_backfill( - dag_id=backfill_request.dag_id, + if not backfill_request.dry_run: + try: + backfill_obj = _create_backfill( + dag_id=backfill_request.dag_id, + from_date=from_date, + to_date=to_date, + max_active_runs=backfill_request.max_active_runs, + reverse=backfill_request.run_backwards, + dag_run_conf=backfill_request.dag_run_conf, + reprocess_behavior=backfill_request.reprocess_behavior, + ) + return BackfillResponse.model_validate(backfill_obj) + except AlreadyRunningBackfill: + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"There is already a running backfill for dag {backfill_request.dag_id}", + ) + else: + serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id)) + if not serdag: + raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}") + + info_list = _get_info_list( + dag=serdag.dag, from_date=from_date, to_date=to_date, - max_active_runs=backfill_request.max_active_runs, reverse=backfill_request.run_backwards, - dag_run_conf=backfill_request.dag_run_conf, - reprocess_behavior=backfill_request.reprocess_behavior, - ) - return BackfillResponse.model_validate(backfill_obj) - except AlreadyRunningBackfill: - raise HTTPException( - status_code=status.HTTP_409_CONFLICT, - detail=f"There is already a running backfill for dag {backfill_request.dag_id}", ) + backfill_response_item = [] + print(info_list) + for info in info_list: + print(info.logical_date) + dr = session.scalar( + select(DagRun) + .where(DagRun.logical_date == info.logical_date) + .order_by(nulls_first(desc(DagRun.start_date), session)) + .limit(1) + ) + + if dr: + non_create_reason = None + if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED): + non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT + elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED: + if dr.state != DagRunState.FAILED: + non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS + if not non_create_reason: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + else: + backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date)) + + return BackfillDryRunResponse(run_info_list=backfill_response_item) diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 216c165ae330f..a41499117674a 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -2779,7 +2779,7 @@ export const useAssetServiceCreateAssetEvent = < * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns BackfillResponse Successful Response + * @returns unknown Successful Response * @throws ApiError */ export const useBackfillServiceCreateBackfill = < diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index defc76ec7345f..f842c5c7c8352 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -350,6 +350,23 @@ export const $BackfillCollectionResponse = { description: "Backfill Collection serializer for responses.", } as const; +export const $BackfillDryRunResponse = { + properties: { + run_info_list: { + items: { + $ref: "#/components/schemas/BackfillRunInfo", + }, + type: "array", + title: "Run Info List", + }, + }, + type: "object", + required: ["run_info_list"], + title: "BackfillDryRunResponse", + description: + "Serializer for responses in dry-run mode for backfill operations.", +} as const; + export const $BackfillPostBody = { properties: { dag_id: { @@ -385,6 +402,11 @@ export const $BackfillPostBody = { title: "Max Active Runs", default: 10, }, + dry_run: { + type: "boolean", + title: "Dry Run", + default: false, + }, }, type: "object", required: ["dag_id", "from_date", "to_date"], @@ -468,6 +490,20 @@ export const $BackfillResponse = { description: "Base serializer for Backfill.", } as const; +export const $BackfillRunInfo = { + properties: { + logical_date: { + type: "string", + format: "date-time", + title: "Logical Date", + }, + }, + type: "object", + required: ["logical_date"], + title: "BackfillRunInfo", + description: "Data model for run information during a backfill operation.", +} as const; + export const $BaseInfoResponse = { properties: { status: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index c048ddb023053..b967f19258e32 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -837,7 +837,7 @@ export class BackfillService { * Create Backfill * @param data The data for the request. * @param data.requestBody - * @returns BackfillResponse Successful Response + * @returns unknown Successful Response * @throws ApiError */ public static createBackfill( diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 9aeda695c3554..b1f17c39c86e7 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -97,6 +97,13 @@ export type BackfillCollectionResponse = { total_entries: number; }; +/** + * Serializer for responses in dry-run mode for backfill operations. + */ +export type BackfillDryRunResponse = { + run_info_list: Array; +}; + /** * Object used for create backfill request. */ @@ -110,6 +117,7 @@ export type BackfillPostBody = { }; reprocess_behavior?: ReprocessBehavior; max_active_runs?: number; + dry_run?: boolean; }; /** @@ -131,6 +139,13 @@ export type BackfillResponse = { updated_at: string; }; +/** + * Data model for run information during a backfill operation. + */ +export type BackfillRunInfo = { + logical_date: string; +}; + /** * Base info serializer for responses. */ @@ -1498,7 +1513,7 @@ export type CreateBackfillData = { requestBody: BackfillPostBody; }; -export type CreateBackfillResponse = BackfillResponse; +export type CreateBackfillResponse = BackfillResponse | BackfillDryRunResponse; export type GetBackfillData = { backfillId: string; @@ -2630,7 +2645,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: BackfillResponse; + 200: BackfillResponse | BackfillDryRunResponse; /** * Unauthorized */ diff --git a/tests/api_fastapi/core_api/routes/public/test_backfills.py b/tests/api_fastapi/core_api/routes/public/test_backfills.py index 1c64b10848f4b..a711c13ff7010 100644 --- a/tests/api_fastapi/core_api/routes/public/test_backfills.py +++ b/tests/api_fastapi/core_api/routes/public/test_backfills.py @@ -192,6 +192,7 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl "max_active_runs": max_active_runs, "run_backwards": False, "dag_run_conf": {"param1": "val1", "param2": True}, + "dry_run": False, } if repro_act is not None: data["reprocess_behavior"] = repro_act