From c8f33d8beb8f95f05f488ddc0ec89321ed748779 Mon Sep 17 00:00:00 2001 From: Ephraim Anierobi Date: Mon, 16 Feb 2026 10:56:10 +0100 Subject: [PATCH] Fix scheduler heartbeat misses caused by slow reschedule dependency check When many task instances enter UP_FOR_RESCHEDULE state, the query to fetch the latest reschedule date becomes slow due to a missing composite index. This causes the scheduler to miss heartbeats. Previously only sensors used reschedule mode, but since fddf4a72406fda039dea813c0ff6e58dd0cd2891, non-sensor tasks can also be rescheduled, significantly increasing the number of rows per task instance in the task_reschedule table. Add a composite (ti_id, id DESC) index to the task_reschedule table, replacing the single-column (ti_id) index. --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/img/airflow_erd.svg | 1692 ++++++++--------- airflow-core/docs/migrations-ref.rst | 8 +- ...1_8_add_index_to_task_reschedule_ti_id.py} | 38 +- ...replace_asset_trigger_table_with_asset.py} | 4 +- ...ge_serialized_dag_data_column_to_jsonb.py} | 0 ...add_length_dag_bundle_team_bundle_name.py} | 0 ...0_add_human_in_the_loop_detail_history.py} | 0 ... 0091_3_2_0_add_fail_fast_to_dag_table.py} | 0 ... 0092_3_2_0_restructure_callback_table.py} | 0 ...ace_deadline_inline_callback_with_fkey.py} | 0 ...94_3_2_0_update_orm_asset_partitioning.py} | 0 ...eam_id.py => 0095_3_2_0_remove_team_id.py} | 0 ...ce_log_event_and_dag_is_stale_not_null.py} | 0 ...0097_3_2_0_add_queue_column_to_trigger.py} | 0 ...add_exceeds_max_runs_flag_to_dag_model.py} | 0 ...0_add_timetable_type_to_dag_table_for_.py} | 0 ...00_3_2_0_ui_improvements_for_deadlines.py} | 0 ...01_3_2_0_make_external_executor_id_text.py | 4 +- .../src/airflow/models/taskreschedule.py | 2 +- airflow-core/src/airflow/utils/db.py | 1 + 21 files changed, 885 insertions(+), 866 deletions(-) rename airflow-core/src/airflow/migrations/versions/{0100_3_2_0_add_index_to_task_reschedule_ti_id.py => 0086_3_1_8_add_index_to_task_reschedule_ti_id.py} (51%) rename airflow-core/src/airflow/migrations/versions/{0086_3_2_0_replace_asset_trigger_table_with_asset.py => 0087_3_2_0_replace_asset_trigger_table_with_asset.py} (98%) rename airflow-core/src/airflow/migrations/versions/{0087_3_2_0_change_serialized_dag_data_column_to_jsonb.py => 0088_3_2_0_change_serialized_dag_data_column_to_jsonb.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0088_3_2_0_add_length_dag_bundle_team_bundle_name.py => 0089_3_2_0_add_length_dag_bundle_team_bundle_name.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0089_3_2_0_add_human_in_the_loop_detail_history.py => 0090_3_2_0_add_human_in_the_loop_detail_history.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0090_3_2_0_add_fail_fast_to_dag_table.py => 0091_3_2_0_add_fail_fast_to_dag_table.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0091_3_2_0_restructure_callback_table.py => 0092_3_2_0_restructure_callback_table.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0092_3_2_0_replace_deadline_inline_callback_with_fkey.py => 0093_3_2_0_replace_deadline_inline_callback_with_fkey.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0093_3_2_0_update_orm_asset_partitioning.py => 0094_3_2_0_update_orm_asset_partitioning.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0094_3_2_0_remove_team_id.py => 0095_3_2_0_remove_team_id.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0095_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py => 0096_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0096_3_2_0_add_queue_column_to_trigger.py => 0097_3_2_0_add_queue_column_to_trigger.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0097_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py => 0098_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0098_3_2_0_add_timetable_type_to_dag_table_for_.py => 0099_3_2_0_add_timetable_type_to_dag_table_for_.py} (100%) rename airflow-core/src/airflow/migrations/versions/{0099_3_2_0_ui_improvements_for_deadlines.py => 0100_3_2_0_ui_improvements_for_deadlines.py} (100%) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index e4b00e9e89384..b7e783e54fe96 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -3c93ed6b4ef29ae8f69e3769b59dbd1f2d56ec7c3db7fa86720016d81ffd1058 \ No newline at end of file +8901cbe69c2bc259020d9cde171128b719388d28aa3440c73a5100d23531b6ea \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index 5918826e5a0d0..400637d531e6a 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -271,7 +271,7 @@ NOT NULL - + dag_bundle:name--dag_bundle_team:dag_bundle_name 0..N @@ -429,7 +429,7 @@ NOT NULL - + team:name--dag_bundle_team:team_name 0..N @@ -698,7 +698,7 @@ NOT NULL - + trigger:id--asset_watcher:trigger_id 0..N @@ -707,178 +707,178 @@ task_instance - -task_instance - -id - - [UUID] - NOT NULL - -context_carrier - - [JSONB] - -custom_operator_name - - [VARCHAR(1000)] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_version_id - - [UUID] - -duration - - [DOUBLE PRECISION] - -end_date - - [TIMESTAMP] - -executor - - [VARCHAR(1000)] - -executor_config - - [BYTEA] - NOT NULL - -external_executor_id - - [TEXT] - -hostname - - [VARCHAR(1000)] - NOT NULL - -last_heartbeat_at - - [TIMESTAMP] - -map_index - - [INTEGER] - NOT NULL - -max_tries - - [INTEGER] - NOT NULL - -next_kwargs - - [JSONB] - -next_method - - [VARCHAR(1000)] - -operator - - [VARCHAR(1000)] - -pid - - [INTEGER] - -pool - - [VARCHAR(256)] - NOT NULL - -pool_slots - - [INTEGER] - NOT NULL - -priority_weight - - [INTEGER] - NOT NULL - -queue - - [VARCHAR(256)] - NOT NULL - -queued_by_job_id - - [INTEGER] - -queued_dttm - - [TIMESTAMP] - -rendered_map_index - - [VARCHAR(250)] - -run_id - - [VARCHAR(250)] - NOT NULL - -scheduled_dttm - - [TIMESTAMP] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(20)] - -task_display_name - - [VARCHAR(2000)] - -task_id - - [VARCHAR(250)] - NOT NULL - -trigger_id - - [INTEGER] - -trigger_timeout - - [TIMESTAMP] - -try_number - - [INTEGER] - NOT NULL - -unixname - - [VARCHAR(1000)] - NOT NULL - -updated_at - - [TIMESTAMP] + +task_instance + +id + + [UUID] + NOT NULL + +context_carrier + + [JSONB] + +custom_operator_name + + [VARCHAR(1000)] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_version_id + + [UUID] + +duration + + [DOUBLE PRECISION] + +end_date + + [TIMESTAMP] + +executor + + [VARCHAR(1000)] + +executor_config + + [BYTEA] + NOT NULL + +external_executor_id + + [TEXT] + +hostname + + [VARCHAR(1000)] + NOT NULL + +last_heartbeat_at + + [TIMESTAMP] + +map_index + + [INTEGER] + NOT NULL + +max_tries + + [INTEGER] + NOT NULL + +next_kwargs + + [JSONB] + +next_method + + [VARCHAR(1000)] + +operator + + [VARCHAR(1000)] + +pid + + [INTEGER] + +pool + + [VARCHAR(256)] + NOT NULL + +pool_slots + + [INTEGER] + NOT NULL + +priority_weight + + [INTEGER] + NOT NULL + +queue + + [VARCHAR(256)] + NOT NULL + +queued_by_job_id + + [INTEGER] + +queued_dttm + + [TIMESTAMP] + +rendered_map_index + + [VARCHAR(250)] + +run_id + + [VARCHAR(250)] + NOT NULL + +scheduled_dttm + + [TIMESTAMP] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(20)] + +task_display_name + + [VARCHAR(2000)] + +task_id + + [VARCHAR(250)] + NOT NULL + +trigger_id + + [INTEGER] + +trigger_timeout + + [TIMESTAMP] + +try_number + + [INTEGER] + NOT NULL + +unixname + + [VARCHAR(1000)] + NOT NULL + +updated_at + + [TIMESTAMP] - + trigger:id--task_instance:trigger_id - -0..N + +0..N {0,1} @@ -935,102 +935,102 @@ asset_alias - -asset_alias - -id - - [INTEGER] - NOT NULL - -group - - [VARCHAR(1500)] - NOT NULL - -name - - [VARCHAR(1500)] - NOT NULL + +asset_alias + +id + + [INTEGER] + NOT NULL + +group + + [VARCHAR(1500)] + NOT NULL + +name + + [VARCHAR(1500)] + NOT NULL asset_alias_asset - -asset_alias_asset - -alias_id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL + +asset_alias_asset + +alias_id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset:alias_id - -0..N -1 + +0..N +1 asset_alias_asset_event - -asset_alias_asset_event - -alias_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +asset_alias_asset_event + +alias_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL - + asset_alias:id--asset_alias_asset_event:alias_id - -0..N -1 + +0..N +1 dag_schedule_asset_alias_reference - -dag_schedule_asset_alias_reference - -alias_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_alias_reference + +alias_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset_alias:id--dag_schedule_asset_alias_reference:alias_id - -0..N -1 + +0..N +1 @@ -1074,261 +1074,261 @@ NOT NULL - + asset:id--asset_alias_asset:asset_id - -0..N + +0..N 1 - + asset:id--asset_watcher:asset_id - + 0..N 1 asset_active - -asset_active - -name - - [VARCHAR(1500)] - NOT NULL - -uri - - [VARCHAR(1500)] - NOT NULL + +asset_active + +name + + [VARCHAR(1500)] + NOT NULL + +uri + + [VARCHAR(1500)] + NOT NULL -asset:uri--asset_active:uri - -1 -1 +asset:name--asset_active:name + +1 +1 -asset:name--asset_active:name - -1 -1 +asset:uri--asset_active:uri + +1 +1 dag_schedule_asset_reference - -dag_schedule_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_schedule_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL - + asset:id--dag_schedule_asset_reference:asset_id - -0..N + +0..N 1 task_outlet_asset_reference - -task_outlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_outlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset:id--task_outlet_asset_reference:asset_id - -0..N + +0..N 1 task_inlet_asset_reference - -task_inlet_asset_reference - -asset_id - - [INTEGER] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -task_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +task_inlet_asset_reference + +asset_id + + [INTEGER] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +task_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL asset:id--task_inlet_asset_reference:asset_id - -0..N + +0..N 1 asset_dag_run_queue - -asset_dag_run_queue - -asset_id - - [INTEGER] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL + +asset_dag_run_queue + +asset_id + + [INTEGER] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL - + asset:id--asset_dag_run_queue:asset_id - -0..N + +0..N 1 asset_event - -asset_event - -id - - [INTEGER] - NOT NULL - -asset_id - - [INTEGER] - NOT NULL - -extra - - [JSON] - NOT NULL - -partition_key - - [VARCHAR(250)] - -source_dag_id - - [VARCHAR(250)] - -source_map_index - - [INTEGER] - -source_run_id - - [VARCHAR(250)] - -source_task_id - - [VARCHAR(250)] - -timestamp - - [TIMESTAMP] - NOT NULL + +asset_event + +id + + [INTEGER] + NOT NULL + +asset_id + + [INTEGER] + NOT NULL + +extra + + [JSON] + NOT NULL + +partition_key + + [VARCHAR(250)] + +source_dag_id + + [VARCHAR(250)] + +source_map_index + + [INTEGER] + +source_run_id + + [VARCHAR(250)] + +source_task_id + + [VARCHAR(250)] + +timestamp + + [TIMESTAMP] + NOT NULL - + asset_event:id--asset_alias_asset_event:event_id - -0..N -1 + +0..N +1 dagrun_asset_event - -dagrun_asset_event - -dag_run_id - - [INTEGER] - NOT NULL - -event_id - - [INTEGER] - NOT NULL + +dagrun_asset_event + +dag_run_id + + [INTEGER] + NOT NULL + +event_id + + [INTEGER] + NOT NULL asset_event:id--dagrun_asset_event:event_id - -0..N -1 + +0..N +1 @@ -1387,84 +1387,84 @@ 1 - + dag:dag_id--dag_schedule_asset_alias_reference:dag_id - -0..N + +0..N 1 - + dag:dag_id--dag_schedule_asset_reference:dag_id - -0..N + +0..N 1 dag:dag_id--task_outlet_asset_reference:dag_id - -0..N + +0..N 1 dag:dag_id--task_inlet_asset_reference:dag_id - -0..N + +0..N 1 - + dag:dag_id--asset_dag_run_queue:target_dag_id - -0..N + +0..N 1 dag_version - -dag_version - -id - - [UUID] - NOT NULL - -bundle_name - - [VARCHAR(250)] - -bundle_version - - [VARCHAR(250)] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -last_updated - - [TIMESTAMP] - NOT NULL - -version_number - - [INTEGER] - NOT NULL + +dag_version + +id + + [UUID] + NOT NULL + +bundle_name + + [VARCHAR(250)] + +bundle_version + + [VARCHAR(250)] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +last_updated + + [TIMESTAMP] + NOT NULL + +version_number + + [INTEGER] + NOT NULL dag:dag_id--dag_version:dag_id - -0..N + +0..N 1 @@ -1577,133 +1577,133 @@ dag_run - -dag_run - -id - - [INTEGER] - NOT NULL - -backfill_id - - [INTEGER] - -bundle_version - - [VARCHAR(250)] - -clear_number - - [INTEGER] - NOT NULL - -conf - - [JSONB] - -context_carrier - - [JSONB] - -created_dag_version_id - - [UUID] - -creating_job_id - - [INTEGER] - -dag_id - - [VARCHAR(250)] - NOT NULL - -data_interval_end - - [TIMESTAMP] - -data_interval_start - - [TIMESTAMP] - -end_date - - [TIMESTAMP] - -last_scheduling_decision - - [TIMESTAMP] - -log_template_id - - [INTEGER] - NOT NULL - -logical_date - - [TIMESTAMP] - -partition_key - - [VARCHAR(250)] - -queued_at - - [TIMESTAMP] - -run_after - - [TIMESTAMP] - NOT NULL - -run_id - - [VARCHAR(250)] - NOT NULL - -run_type - - [VARCHAR(50)] - NOT NULL - -scheduled_by_job_id - - [INTEGER] - -span_status - - [VARCHAR(250)] - NOT NULL - -start_date - - [TIMESTAMP] - -state - - [VARCHAR(50)] - NOT NULL - -triggered_by - - [VARCHAR(50)] - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL + +dag_run + +id + + [INTEGER] + NOT NULL + +backfill_id + + [INTEGER] + +bundle_version + + [VARCHAR(250)] + +clear_number + + [INTEGER] + NOT NULL + +conf + + [JSONB] + +context_carrier + + [JSONB] + +created_dag_version_id + + [UUID] + +creating_job_id + + [INTEGER] + +dag_id + + [VARCHAR(250)] + NOT NULL + +data_interval_end + + [TIMESTAMP] + +data_interval_start + + [TIMESTAMP] + +end_date + + [TIMESTAMP] + +last_scheduling_decision + + [TIMESTAMP] + +log_template_id + + [INTEGER] + NOT NULL + +logical_date + + [TIMESTAMP] + +partition_key + + [VARCHAR(250)] + +queued_at + + [TIMESTAMP] + +run_after + + [TIMESTAMP] + NOT NULL + +run_id + + [VARCHAR(250)] + NOT NULL + +run_type + + [VARCHAR(50)] + NOT NULL + +scheduled_by_job_id + + [INTEGER] + +span_status + + [VARCHAR(250)] + NOT NULL + +start_date + + [TIMESTAMP] + +state + + [VARCHAR(50)] + NOT NULL + +triggered_by + + [VARCHAR(50)] + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] + NOT NULL dag_version:id--dag_run:created_dag_version_id - -0..N -{0,1} + +0..N +{0,1} @@ -1754,9 +1754,9 @@ dag_version:id--dag_code:dag_version_id - + 0..N -1 +1 @@ -1805,112 +1805,112 @@ dag_version:id--serialized_dag:dag_version_id - + 0..N -1 +1 - + dag_version:id--task_instance:dag_version_id - -0..N -{0,1} + +0..N +{0,1} log_template - -log_template - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -elasticsearch_id - - [TEXT] - NOT NULL - -filename - - [TEXT] - NOT NULL + +log_template + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +elasticsearch_id + + [TEXT] + NOT NULL + +filename + + [TEXT] + NOT NULL log_template:id--dag_run:log_template_id - -0..N -1 + +0..N +1 dag_run:id--dagrun_asset_event:dag_run_id - -0..N -1 + +0..N +1 asset_partition_dag_run - -asset_partition_dag_run - -id - - [INTEGER] - NOT NULL - -created_at - - [TIMESTAMP] - NOT NULL - -created_dag_run_id - - [INTEGER] - -partition_key - - [VARCHAR(250)] - NOT NULL - -target_dag_id - - [VARCHAR(250)] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL + +asset_partition_dag_run + +id + + [INTEGER] + NOT NULL + +created_at + + [TIMESTAMP] + NOT NULL + +created_dag_run_id + + [INTEGER] + +partition_key + + [VARCHAR(250)] + NOT NULL + +target_dag_id + + [VARCHAR(250)] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL dag_run:id--asset_partition_dag_run:created_dag_run_id - -0..N -{0,1} + +0..N +{0,1} - -dag_run:run_id--task_instance:run_id - -0..N -1 + +dag_run:dag_id--task_instance:dag_id + +0..N +1 - -dag_run:dag_id--task_instance:dag_id - -0..N -1 + +dag_run:run_id--task_instance:run_id + +0..N +1 @@ -1947,131 +1947,131 @@ NOT NULL - + dag_run:id--backfill_dag_run:dag_run_id - + 0..N -{0,1} +{0,1} dag_run_note - -dag_run_note - -dag_run_id - - [INTEGER] - NOT NULL - -content - - [VARCHAR(1000)] - -created_at - - [TIMESTAMP] - NOT NULL - -updated_at - - [TIMESTAMP] - NOT NULL - -user_id - - [VARCHAR(128)] + +dag_run_note + +dag_run_id + + [INTEGER] + NOT NULL + +content + + [VARCHAR(1000)] + +created_at + + [TIMESTAMP] + NOT NULL + +updated_at + + [TIMESTAMP] + NOT NULL + +user_id + + [VARCHAR(128)] dag_run:id--dag_run_note:dag_run_id - -1 -1 + +1 +1 dag_run:id--deadline:dagrun_id - + 0..N -{0,1} +{0,1} backfill - -backfill - -id - - [INTEGER] - NOT NULL - -completed_at - - [TIMESTAMP] - -created_at - - [TIMESTAMP] - NOT NULL - -dag_id - - [VARCHAR(250)] - NOT NULL - -dag_run_conf - - [JSON] - NOT NULL - -from_date - - [TIMESTAMP] - NOT NULL - -is_paused - - [BOOLEAN] - -max_active_runs - - [INTEGER] - NOT NULL - -reprocess_behavior - - [VARCHAR(250)] - NOT NULL - -to_date - - [TIMESTAMP] - NOT NULL - -triggering_user_name - - [VARCHAR(512)] - -updated_at - - [TIMESTAMP] - NOT NULL + +backfill + +id + + [INTEGER] + NOT NULL + +completed_at + + [TIMESTAMP] + +created_at + + [TIMESTAMP] + NOT NULL + +dag_id + + [VARCHAR(250)] + NOT NULL + +dag_run_conf + + [JSON] + NOT NULL + +from_date + + [TIMESTAMP] + NOT NULL + +is_paused + + [BOOLEAN] + +max_active_runs + + [INTEGER] + NOT NULL + +reprocess_behavior + + [VARCHAR(250)] + NOT NULL + +to_date + + [TIMESTAMP] + NOT NULL + +triggering_user_name + + [VARCHAR(512)] + +updated_at + + [TIMESTAMP] + NOT NULL backfill:id--dag_run:backfill_id - -0..N -{0,1} + +0..N +{0,1} - + backfill:id--backfill_dag_run:backfill_id - + 0..N -1 +1 @@ -2198,9 +2198,9 @@ task_instance:id--hitl_detail:ti_id - + 1 -1 +1 @@ -2239,31 +2239,31 @@ -task_instance:task_id--task_map:task_id - -0..N -1 +task_instance:map_index--task_map:map_index + +0..N +1 task_instance:run_id--task_map:run_id - + 0..N -1 +1 task_instance:dag_id--task_map:dag_id - + 0..N -1 +1 -task_instance:map_index--task_map:map_index - -0..N -1 +task_instance:task_id--task_map:task_id + +0..N +1 @@ -2304,9 +2304,9 @@ task_instance:id--task_reschedule:ti_id - + 0..N -1 +1 @@ -2355,31 +2355,31 @@ -task_instance:run_id--xcom:run_id - -0..N -1 +task_instance:task_id--xcom:task_id + +0..N +1 -task_instance:dag_id--xcom:dag_id - -0..N -1 +task_instance:run_id--xcom:run_id + +0..N +1 task_instance:map_index--xcom:map_index - + 0..N -1 +1 -task_instance:task_id--xcom:task_id - -0..N -1 +task_instance:dag_id--xcom:dag_id + +0..N +1 @@ -2413,9 +2413,9 @@ task_instance:id--task_instance_note:ti_id - + 1 -1 +1 @@ -2579,30 +2579,30 @@ task_instance:map_index--task_instance_history:map_index - + 0..N -1 +1 task_instance:task_id--task_instance_history:task_id - + 0..N -1 +1 -task_instance:run_id--task_instance_history:run_id - -0..N -1 +task_instance:dag_id--task_instance_history:dag_id + +0..N +1 -task_instance:dag_id--task_instance_history:dag_id - -0..N -1 +task_instance:run_id--task_instance_history:run_id + +0..N +1 @@ -2641,31 +2641,31 @@ +task_instance:map_index--rendered_task_instance_fields:map_index + +0..N +1 + + + task_instance:task_id--rendered_task_instance_fields:task_id - + 0..N -1 +1 - + task_instance:dag_id--rendered_task_instance_fields:dag_id - + 0..N -1 +1 - + task_instance:run_id--rendered_task_instance_fields:run_id - + 0..N -1 - - - -task_instance:map_index--rendered_task_instance_fields:map_index - -0..N -1 +1 diff --git a/airflow-core/docs/migrations-ref.rst b/airflow-core/docs/migrations-ref.rst index 01f0fc1c3cee9..b0d6248e1a5f9 100644 --- a/airflow-core/docs/migrations-ref.rst +++ b/airflow-core/docs/migrations-ref.rst @@ -43,11 +43,9 @@ Here's the list of all the Database Migrations that are executed via when you ru +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``53ff648b8a26`` | ``a5a3e5eb9b8d`` | ``3.2.0`` | Add revoked_token table. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``a5a3e5eb9b8d`` | ``82dbd68e6171`` | ``3.2.0`` | Make external_executor_id TEXT to allow for longer | +| ``a5a3e5eb9b8d`` | ``55297ae24532`` | ``3.2.0`` | Make external_executor_id TEXT to allow for longer | | | | | external_executor_ids. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``82dbd68e6171`` | ``55297ae24532`` | ``3.2.0`` | Add index to task_reschedule ti_id . | -+-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``55297ae24532`` | ``e79fc784f145`` | ``3.2.0`` | Add required fields to enable UI integrations for the | | | | | Deadline Alerts feature. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ @@ -79,7 +77,9 @@ Here's the list of all the Database Migrations that are executed via when you ru | ``ab6dc0c82d0e`` | ``15d84ca19038`` | ``3.2.0`` | Change ``serialized_dag`` data column to JSONB for | | | | | PostgreSQL. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ -| ``15d84ca19038`` | ``cc92b33c6709`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. | +| ``15d84ca19038`` | ``82dbd68e6171`` | ``3.2.0`` | replace asset_trigger table with asset_watcher. | ++-------------------------+------------------+-------------------+--------------------------------------------------------------+ +| ``82dbd68e6171`` | ``cc92b33c6709`` | ``3.1.8`` | Add composite index (ti_id, id DESC) to task_reschedule. | +-------------------------+------------------+-------------------+--------------------------------------------------------------+ | ``cc92b33c6709`` | ``eaf332f43c7c`` | ``3.1.0`` | Add backward compatibility for serialized DAG format v3 to | | | | | v2. | diff --git a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_index_to_task_reschedule_ti_id.py b/airflow-core/src/airflow/migrations/versions/0086_3_1_8_add_index_to_task_reschedule_ti_id.py similarity index 51% rename from airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_index_to_task_reschedule_ti_id.py rename to airflow-core/src/airflow/migrations/versions/0086_3_1_8_add_index_to_task_reschedule_ti_id.py index ed4f1a104963e..4fe0004770a28 100644 --- a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_index_to_task_reschedule_ti_id.py +++ b/airflow-core/src/airflow/migrations/versions/0086_3_1_8_add_index_to_task_reschedule_ti_id.py @@ -17,10 +17,10 @@ # under the License. """ -Add index to task_reschedule ti_id . +Add composite index (ti_id, id DESC) to task_reschedule. Revision ID: 82dbd68e6171 -Revises: 55297ae24532 +Revises: cc92b33c6709 Create Date: 2026-01-22 16:25:42.164449 """ @@ -31,28 +31,46 @@ # revision identifiers, used by Alembic. revision = "82dbd68e6171" -down_revision = "55297ae24532" +down_revision = "cc92b33c6709" branch_labels = None depends_on = None -airflow_version = "3.2.0" +airflow_version = "3.1.8" def upgrade(): - """Add index to task_reschedule ti_id.""" - with op.batch_alter_table("task_reschedule", schema=None) as batch_op: - batch_op.create_index("idx_task_reschedule_ti_id", ["ti_id"], unique=False) + """Add composite (ti_id, id DESC) index to task_reschedule.""" + dialect_name = op.get_context().dialect.name + if dialect_name == "mysql": + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.drop_constraint("task_reschedule_ti_fkey", type_="foreignkey") + op.execute("CREATE INDEX idx_task_reschedule_ti_id_id_desc ON task_reschedule (ti_id, id DESC)") + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.create_foreign_key( + "task_reschedule_ti_fkey", "task_instance", ["ti_id"], ["id"], ondelete="CASCADE" + ) + elif dialect_name == "sqlite": + op.execute("CREATE INDEX idx_task_reschedule_ti_id_id_desc ON task_reschedule (ti_id, id DESC)") + else: + # PostgreSQL + with op.batch_alter_table("task_reschedule", schema=None) as batch_op: + batch_op.create_index( + "idx_task_reschedule_ti_id_id_desc", + ["ti_id", "id"], + unique=False, + postgresql_ops={"id": "DESC"}, + ) def downgrade(): - """Remove index from task_reschedule ti_id.""" + """Remove composite index from task_reschedule.""" dialect_name = op.get_context().dialect.name if dialect_name == "mysql": with op.batch_alter_table("task_reschedule", schema=None) as batch_op: batch_op.drop_constraint("task_reschedule_ti_fkey", type_="foreignkey") - batch_op.drop_index("idx_task_reschedule_ti_id") + batch_op.drop_index("idx_task_reschedule_ti_id_id_desc") batch_op.create_foreign_key( "task_reschedule_ti_fkey", "task_instance", ["ti_id"], ["id"], ondelete="CASCADE" ) else: with op.batch_alter_table("task_reschedule", schema=None) as batch_op: - batch_op.drop_index("idx_task_reschedule_ti_id") + batch_op.drop_index("idx_task_reschedule_ti_id_id_desc") diff --git a/airflow-core/src/airflow/migrations/versions/0086_3_2_0_replace_asset_trigger_table_with_asset.py b/airflow-core/src/airflow/migrations/versions/0087_3_2_0_replace_asset_trigger_table_with_asset.py similarity index 98% rename from airflow-core/src/airflow/migrations/versions/0086_3_2_0_replace_asset_trigger_table_with_asset.py rename to airflow-core/src/airflow/migrations/versions/0087_3_2_0_replace_asset_trigger_table_with_asset.py index 0006abcbe7788..9e43f10491482 100644 --- a/airflow-core/src/airflow/migrations/versions/0086_3_2_0_replace_asset_trigger_table_with_asset.py +++ b/airflow-core/src/airflow/migrations/versions/0087_3_2_0_replace_asset_trigger_table_with_asset.py @@ -20,7 +20,7 @@ replace asset_trigger table with asset_watcher. Revision ID: 15d84ca19038 -Revises: cc92b33c6709 +Revises: 82dbd68e6171 Create Date: 2025-09-14 01:34:40.423767 """ @@ -32,7 +32,7 @@ # revision identifiers, used by Alembic. revision = "15d84ca19038" -down_revision = "cc92b33c6709" +down_revision = "82dbd68e6171" branch_labels = None depends_on = None airflow_version = "3.2.0" diff --git a/airflow-core/src/airflow/migrations/versions/0087_3_2_0_change_serialized_dag_data_column_to_jsonb.py b/airflow-core/src/airflow/migrations/versions/0088_3_2_0_change_serialized_dag_data_column_to_jsonb.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0087_3_2_0_change_serialized_dag_data_column_to_jsonb.py rename to airflow-core/src/airflow/migrations/versions/0088_3_2_0_change_serialized_dag_data_column_to_jsonb.py diff --git a/airflow-core/src/airflow/migrations/versions/0088_3_2_0_add_length_dag_bundle_team_bundle_name.py b/airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_length_dag_bundle_team_bundle_name.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0088_3_2_0_add_length_dag_bundle_team_bundle_name.py rename to airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_length_dag_bundle_team_bundle_name.py diff --git a/airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_human_in_the_loop_detail_history.py b/airflow-core/src/airflow/migrations/versions/0090_3_2_0_add_human_in_the_loop_detail_history.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0089_3_2_0_add_human_in_the_loop_detail_history.py rename to airflow-core/src/airflow/migrations/versions/0090_3_2_0_add_human_in_the_loop_detail_history.py diff --git a/airflow-core/src/airflow/migrations/versions/0090_3_2_0_add_fail_fast_to_dag_table.py b/airflow-core/src/airflow/migrations/versions/0091_3_2_0_add_fail_fast_to_dag_table.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0090_3_2_0_add_fail_fast_to_dag_table.py rename to airflow-core/src/airflow/migrations/versions/0091_3_2_0_add_fail_fast_to_dag_table.py diff --git a/airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py b/airflow-core/src/airflow/migrations/versions/0092_3_2_0_restructure_callback_table.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0091_3_2_0_restructure_callback_table.py rename to airflow-core/src/airflow/migrations/versions/0092_3_2_0_restructure_callback_table.py diff --git a/airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py b/airflow-core/src/airflow/migrations/versions/0093_3_2_0_replace_deadline_inline_callback_with_fkey.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0092_3_2_0_replace_deadline_inline_callback_with_fkey.py rename to airflow-core/src/airflow/migrations/versions/0093_3_2_0_replace_deadline_inline_callback_with_fkey.py diff --git a/airflow-core/src/airflow/migrations/versions/0093_3_2_0_update_orm_asset_partitioning.py b/airflow-core/src/airflow/migrations/versions/0094_3_2_0_update_orm_asset_partitioning.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0093_3_2_0_update_orm_asset_partitioning.py rename to airflow-core/src/airflow/migrations/versions/0094_3_2_0_update_orm_asset_partitioning.py diff --git a/airflow-core/src/airflow/migrations/versions/0094_3_2_0_remove_team_id.py b/airflow-core/src/airflow/migrations/versions/0095_3_2_0_remove_team_id.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0094_3_2_0_remove_team_id.py rename to airflow-core/src/airflow/migrations/versions/0095_3_2_0_remove_team_id.py diff --git a/airflow-core/src/airflow/migrations/versions/0095_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py b/airflow-core/src/airflow/migrations/versions/0096_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0095_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py rename to airflow-core/src/airflow/migrations/versions/0096_3_2_0_enforce_log_event_and_dag_is_stale_not_null.py diff --git a/airflow-core/src/airflow/migrations/versions/0096_3_2_0_add_queue_column_to_trigger.py b/airflow-core/src/airflow/migrations/versions/0097_3_2_0_add_queue_column_to_trigger.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0096_3_2_0_add_queue_column_to_trigger.py rename to airflow-core/src/airflow/migrations/versions/0097_3_2_0_add_queue_column_to_trigger.py diff --git a/airflow-core/src/airflow/migrations/versions/0097_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py b/airflow-core/src/airflow/migrations/versions/0098_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0097_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py rename to airflow-core/src/airflow/migrations/versions/0098_3_2_0_add_exceeds_max_runs_flag_to_dag_model.py diff --git a/airflow-core/src/airflow/migrations/versions/0098_3_2_0_add_timetable_type_to_dag_table_for_.py b/airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_timetable_type_to_dag_table_for_.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0098_3_2_0_add_timetable_type_to_dag_table_for_.py rename to airflow-core/src/airflow/migrations/versions/0099_3_2_0_add_timetable_type_to_dag_table_for_.py diff --git a/airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_ui_improvements_for_deadlines.py similarity index 100% rename from airflow-core/src/airflow/migrations/versions/0099_3_2_0_ui_improvements_for_deadlines.py rename to airflow-core/src/airflow/migrations/versions/0100_3_2_0_ui_improvements_for_deadlines.py diff --git a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_make_external_executor_id_text.py b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_make_external_executor_id_text.py index f05cb172e307b..425bb073066dc 100644 --- a/airflow-core/src/airflow/migrations/versions/0101_3_2_0_make_external_executor_id_text.py +++ b/airflow-core/src/airflow/migrations/versions/0101_3_2_0_make_external_executor_id_text.py @@ -20,7 +20,7 @@ Make external_executor_id TEXT to allow for longer external_executor_ids. Revision ID: a5a3e5eb9b8d -Revises: 82dbd68e6171 +Revises: 55297ae24532 Create Date: 2026-01-28 16:35:00.000000 """ @@ -32,7 +32,7 @@ # revision identifiers, used by Alembic. revision = "a5a3e5eb9b8d" -down_revision = "82dbd68e6171" +down_revision = "55297ae24532" branch_labels = None depends_on = None airflow_version = "3.2.0" diff --git a/airflow-core/src/airflow/models/taskreschedule.py b/airflow-core/src/airflow/models/taskreschedule.py index a7c3022b8d77c..b9514b33ed952 100644 --- a/airflow-core/src/airflow/models/taskreschedule.py +++ b/airflow-core/src/airflow/models/taskreschedule.py @@ -58,7 +58,7 @@ class TaskReschedule(Base): duration: Mapped[int] = mapped_column(Integer, nullable=False) reschedule_date: Mapped[datetime.datetime] = mapped_column(UtcDateTime, nullable=False) - __table_args__ = (Index("idx_task_reschedule_ti_id", ti_id),) + __table_args__ = (Index("idx_task_reschedule_ti_id_id_desc", ti_id, id.desc()),) task_instance = relationship( "TaskInstance", primaryjoin="TaskReschedule.ti_id == foreign(TaskInstance.id)", uselist=False diff --git a/airflow-core/src/airflow/utils/db.py b/airflow-core/src/airflow/utils/db.py index 01407f3b004b3..01b5cc21b4690 100644 --- a/airflow-core/src/airflow/utils/db.py +++ b/airflow-core/src/airflow/utils/db.py @@ -112,6 +112,7 @@ class MappedClassProtocol(Protocol): "3.0.0": "29ce7909c52b", "3.0.3": "fe199e1abd77", "3.1.0": "cc92b33c6709", + "3.1.8": "82dbd68e6171", "3.2.0": "f8c9d7e6b5a4", }