|
17 | 17 | from __future__ import annotations
|
18 | 18 |
|
19 | 19 | import os
|
20 |
| -from typing import Annotated |
| 20 | +from typing import Annotated, cast |
21 | 21 |
|
22 | 22 | from fastapi import Depends, HTTPException, Query, status
|
23 | 23 | from sqlalchemy import select
|
|
27 | 27 | from airflow.api_fastapi.common.router import AirflowRouter
|
28 | 28 | from airflow.api_fastapi.core_api.datamodels.connections import (
|
29 | 29 | ConnectionBody,
|
| 30 | + ConnectionBulkBody, |
30 | 31 | ConnectionCollectionResponse,
|
31 | 32 | ConnectionResponse,
|
32 | 33 | ConnectionTestResponse,
|
|
35 | 36 | from airflow.configuration import conf
|
36 | 37 | from airflow.models import Connection
|
37 | 38 | from airflow.secrets.environment_variables import CONN_ENV_PREFIX
|
38 |
| -from airflow.utils import helpers |
39 | 39 | from airflow.utils.strings import get_random_string
|
40 | 40 |
|
41 | 41 | connections_router = AirflowRouter(tags=["Connection"], prefix="/connections")
|
@@ -126,24 +126,29 @@ def post_connection(
|
126 | 126 | session: SessionDep,
|
127 | 127 | ) -> ConnectionResponse:
|
128 | 128 | """Create connection entry."""
|
129 |
| - try: |
130 |
| - helpers.validate_key(post_body.connection_id, max_length=200) |
131 |
| - except Exception as e: |
132 |
| - raise HTTPException(status.HTTP_400_BAD_REQUEST, f"{e}") |
133 |
| - |
134 |
| - connection = session.scalar(select(Connection).filter_by(conn_id=post_body.connection_id)) |
135 |
| - if connection is not None: |
136 |
| - raise HTTPException( |
137 |
| - status.HTTP_409_CONFLICT, |
138 |
| - f"Connection with connection_id: `{post_body.connection_id}` already exists", |
139 |
| - ) |
140 |
| - |
141 | 129 | connection = Connection(**post_body.model_dump(by_alias=True))
|
142 | 130 | session.add(connection)
|
143 |
| - |
144 | 131 | return connection
|
145 | 132 |
|
146 | 133 |
|
| 134 | +@connections_router.post( |
| 135 | + "/bulk", |
| 136 | + status_code=status.HTTP_201_CREATED, |
| 137 | + responses=create_openapi_http_exception_doc([status.HTTP_409_CONFLICT]), |
| 138 | +) |
| 139 | +def post_connections( |
| 140 | + post_body: ConnectionBulkBody, |
| 141 | + session: SessionDep, |
| 142 | +) -> ConnectionCollectionResponse: |
| 143 | + """Create connection entry.""" |
| 144 | + connections = [Connection(**body.model_dump(by_alias=True)) for body in post_body.connections] |
| 145 | + session.add_all(connections) |
| 146 | + return ConnectionCollectionResponse( |
| 147 | + connections=cast(list[ConnectionResponse], connections), |
| 148 | + total_entries=len(connections), |
| 149 | + ) |
| 150 | + |
| 151 | + |
147 | 152 | @connections_router.patch(
|
148 | 153 | "/{connection_id}",
|
149 | 154 | responses=create_openapi_http_exception_doc(
|
|
0 commit comments