@@ -1102,7 +1102,7 @@ def check_conn_type_null(session: Session) -> Iterable[str]:
1102
1102
1103
1103
n_nulls = []
1104
1104
try :
1105
- n_nulls = session .execute (select (Connection .conn_id ).filter (Connection .conn_type .is_ (None ))).all ()
1105
+ n_nulls = session .scalars (select (Connection .conn_id ).where (Connection .conn_type .is_ (None ))).all ()
1106
1106
except (exc .OperationalError , exc .ProgrammingError , exc .InternalError ):
1107
1107
# fallback if tables hasn't been created yet
1108
1108
session .rollback ()
@@ -1144,7 +1144,7 @@ def check_run_id_null(session: Session) -> Iterable[str]:
1144
1144
dagrun_table .c .run_id .is_ (None ),
1145
1145
dagrun_table .c .execution_date .is_ (None ),
1146
1146
)
1147
- invalid_dagrun_count = session .scalar (select (func .count (dagrun_table .c .id )).filter (invalid_dagrun_filter ))
1147
+ invalid_dagrun_count = session .scalar (select (func .count (dagrun_table .c .id )).where (invalid_dagrun_filter ))
1148
1148
if invalid_dagrun_count > 0 :
1149
1149
dagrun_dangling_table_name = _format_airflow_moved_table_name (dagrun_table .name , "2.2" , "dangling" )
1150
1150
if dagrun_dangling_table_name in inspect (session .get_bind ()).get_table_names ():
@@ -1335,7 +1335,7 @@ def _move_duplicate_data_to_new_table(
1335
1335
dialect_name = bind .dialect .name
1336
1336
1337
1337
query = (
1338
- session . select (* [getattr (source_table .c , x .name ).label (str (x .name )) for x in source_table .columns ])
1338
+ select (* [getattr (source_table .c , x .name ).label (str (x .name )) for x in source_table .columns ])
1339
1339
.select_from (source_table )
1340
1340
.join (subquery , and_ (* [getattr (source_table .c , x ) == getattr (subquery .c , x ) for x in uniqueness ]))
1341
1341
)
0 commit comments