Skip to content

Commit

Permalink
Add dry run for backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
Sneha Prabhu authored and prabhusneha committed Dec 18, 2024
1 parent 3c11168 commit d288934
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 19 deletions.
13 changes: 13 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class BackfillPostBody(BaseModel):
dag_run_conf: dict = {}
reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE
max_active_runs: int = 10
dry_run: bool = False


class BackfillResponse(BaseModel):
Expand All @@ -56,3 +57,15 @@ class BackfillCollectionResponse(BaseModel):

backfills: list[BackfillResponse]
total_entries: int


class BackfillRunInfo(BaseModel):
"""Data model for run information during a backfill operation."""

logical_date: datetime


class BackfillDryRunResponse(BaseModel):
"""Serializer for responses in dry-run mode for backfill operations."""

run_info_list: list[BackfillRunInfo]
32 changes: 31 additions & 1 deletion airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1143,7 +1143,10 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/BackfillResponse'
anyOf:
- $ref: '#/components/schemas/BackfillResponse'
- $ref: '#/components/schemas/BackfillDryRunResponse'
title: Response Create Backfill
'401':
content:
application/json:
Expand Down Expand Up @@ -6210,6 +6213,18 @@ components:
- total_entries
title: BackfillCollectionResponse
description: Backfill Collection serializer for responses.
BackfillDryRunResponse:
properties:
run_info_list:
items:
$ref: '#/components/schemas/BackfillRunInfo'
type: array
title: Run Info List
type: object
required:
- run_info_list
title: BackfillDryRunResponse
description: Serializer for responses in dry-run mode for backfill operations.
BackfillPostBody:
properties:
dag_id:
Expand Down Expand Up @@ -6238,6 +6253,10 @@ components:
type: integer
title: Max Active Runs
default: 10
dry_run:
type: boolean
title: Dry Run
default: false
type: object
required:
- dag_id
Expand Down Expand Up @@ -6301,6 +6320,17 @@ components:
- updated_at
title: BackfillResponse
description: Base serializer for Backfill.
BackfillRunInfo:
properties:
logical_date:
type: string
format: date-time
title: Logical Date
type: object
required:
- logical_date
title: BackfillRunInfo
description: Data model for run information during a backfill operation.
BaseInfoResponse:
properties:
status:
Expand Down
75 changes: 61 additions & 14 deletions airflow/api_fastapi/core_api/routes/public/backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Annotated

from fastapi import Depends, HTTPException, status
from sqlalchemy import select, update
from sqlalchemy import desc, select, update

from airflow.api_fastapi.common.db.common import (
AsyncSessionDep,
Expand All @@ -30,8 +30,10 @@
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.backfills import (
BackfillCollectionResponse,
BackfillDryRunResponse,
BackfillPostBody,
BackfillResponse,
BackfillRunInfo,
)
from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
Expand All @@ -41,9 +43,14 @@
AlreadyRunningBackfill,
Backfill,
BackfillDagRun,
BackfillDagRunExceptionReason,
ReprocessBehavior,
_create_backfill,
_get_info_list,
)
from airflow.models.serialized_dag import SerializedDagModel
from airflow.utils import timezone
from airflow.utils.sqlalchemy import nulls_first
from airflow.utils.state import DagRunState

backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills")
Expand Down Expand Up @@ -187,22 +194,62 @@ def cancel_backfill(backfill_id, session: SessionDep) -> BackfillResponse:
)
def create_backfill(
backfill_request: BackfillPostBody,
) -> BackfillResponse:
session: SessionDep,
) -> BackfillResponse | BackfillDryRunResponse:
from_date = timezone.coerce_datetime(backfill_request.from_date)
to_date = timezone.coerce_datetime(backfill_request.to_date)
try:
backfill_obj = _create_backfill(
dag_id=backfill_request.dag_id,
if not backfill_request.dry_run:
try:
backfill_obj = _create_backfill(
dag_id=backfill_request.dag_id,
from_date=from_date,
to_date=to_date,
max_active_runs=backfill_request.max_active_runs,
reverse=backfill_request.run_backwards,
dag_run_conf=backfill_request.dag_run_conf,
reprocess_behavior=backfill_request.reprocess_behavior,
)
return BackfillResponse.model_validate(backfill_obj)
except AlreadyRunningBackfill:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
)
else:
serdag = session.scalar(SerializedDagModel.latest_item_select_object(backfill_request.dag_id))
if not serdag:
raise HTTPException(status_code=404, detail=f"Could not find dag {backfill_request.dag_id}")

info_list = _get_info_list(
dag=serdag.dag,
from_date=from_date,
to_date=to_date,
max_active_runs=backfill_request.max_active_runs,
reverse=backfill_request.run_backwards,
dag_run_conf=backfill_request.dag_run_conf,
reprocess_behavior=backfill_request.reprocess_behavior,
)
return BackfillResponse.model_validate(backfill_obj)
except AlreadyRunningBackfill:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"There is already a running backfill for dag {backfill_request.dag_id}",
)
backfill_response_item = []
print(info_list)
for info in info_list:
print(info.logical_date)
dr = session.scalar(
select(DagRun)
.where(DagRun.logical_date == info.logical_date)
.order_by(nulls_first(desc(DagRun.start_date), session))
.limit(1)
)

if dr:
non_create_reason = None
if dr.state not in (DagRunState.SUCCESS, DagRunState.FAILED):
non_create_reason = BackfillDagRunExceptionReason.IN_FLIGHT
elif backfill_request.reprocess_behavior is ReprocessBehavior.NONE:
non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
elif backfill_request.reprocess_behavior is ReprocessBehavior.FAILED:
if dr.state != DagRunState.FAILED:
non_create_reason = BackfillDagRunExceptionReason.ALREADY_EXISTS
if not non_create_reason:
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))

else:
backfill_response_item.append(BackfillRunInfo(logical_date=info.logical_date))

return BackfillDryRunResponse(run_info_list=backfill_response_item)
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,7 @@ export const useAssetServiceCreateAssetEvent = <
* Create Backfill
* @param data The data for the request.
* @param data.requestBody
* @returns BackfillResponse Successful Response
* @returns unknown Successful Response
* @throws ApiError
*/
export const useBackfillServiceCreateBackfill = <
Expand Down
36 changes: 36 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,23 @@ export const $BackfillCollectionResponse = {
description: "Backfill Collection serializer for responses.",
} as const;

export const $BackfillDryRunResponse = {
properties: {
run_info_list: {
items: {
$ref: "#/components/schemas/BackfillRunInfo",
},
type: "array",
title: "Run Info List",
},
},
type: "object",
required: ["run_info_list"],
title: "BackfillDryRunResponse",
description:
"Serializer for responses in dry-run mode for backfill operations.",
} as const;

export const $BackfillPostBody = {
properties: {
dag_id: {
Expand Down Expand Up @@ -385,6 +402,11 @@ export const $BackfillPostBody = {
title: "Max Active Runs",
default: 10,
},
dry_run: {
type: "boolean",
title: "Dry Run",
default: false,
},
},
type: "object",
required: ["dag_id", "from_date", "to_date"],
Expand Down Expand Up @@ -468,6 +490,20 @@ export const $BackfillResponse = {
description: "Base serializer for Backfill.",
} as const;

export const $BackfillRunInfo = {
properties: {
logical_date: {
type: "string",
format: "date-time",
title: "Logical Date",
},
},
type: "object",
required: ["logical_date"],
title: "BackfillRunInfo",
description: "Data model for run information during a backfill operation.",
} as const;

export const $BaseInfoResponse = {
properties: {
status: {
Expand Down
2 changes: 1 addition & 1 deletion airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -837,7 +837,7 @@ export class BackfillService {
* Create Backfill
* @param data The data for the request.
* @param data.requestBody
* @returns BackfillResponse Successful Response
* @returns unknown Successful Response
* @throws ApiError
*/
public static createBackfill(
Expand Down
19 changes: 17 additions & 2 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ export type BackfillCollectionResponse = {
total_entries: number;
};

/**
* Serializer for responses in dry-run mode for backfill operations.
*/
export type BackfillDryRunResponse = {
run_info_list: Array<BackfillRunInfo>;
};

/**
* Object used for create backfill request.
*/
Expand All @@ -110,6 +117,7 @@ export type BackfillPostBody = {
};
reprocess_behavior?: ReprocessBehavior;
max_active_runs?: number;
dry_run?: boolean;
};

/**
Expand All @@ -131,6 +139,13 @@ export type BackfillResponse = {
updated_at: string;
};

/**
* Data model for run information during a backfill operation.
*/
export type BackfillRunInfo = {
logical_date: string;
};

/**
* Base info serializer for responses.
*/
Expand Down Expand Up @@ -1498,7 +1513,7 @@ export type CreateBackfillData = {
requestBody: BackfillPostBody;
};

export type CreateBackfillResponse = BackfillResponse;
export type CreateBackfillResponse = BackfillResponse | BackfillDryRunResponse;

export type GetBackfillData = {
backfillId: string;
Expand Down Expand Up @@ -2630,7 +2645,7 @@ export type $OpenApiTs = {
/**
* Successful Response
*/
200: BackfillResponse;
200: BackfillResponse | BackfillDryRunResponse;
/**
* Unauthorized
*/
Expand Down
1 change: 1 addition & 0 deletions tests/api_fastapi/core_api/routes/public/test_backfills.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ def test_create_backfill(self, repro_act, repro_exp, session, dag_maker, test_cl
"max_active_runs": max_active_runs,
"run_backwards": False,
"dag_run_conf": {"param1": "val1", "param2": True},
"dry_run": False,
}
if repro_act is not None:
data["reprocess_behavior"] = repro_act
Expand Down

0 comments on commit d288934

Please sign in to comment.