From 0b3d0e9fb5e083ab10d0edd8270aa5d09b832763 Mon Sep 17 00:00:00 2001 From: Christos Bisias Date: Fri, 13 Feb 2026 15:30:34 +0200 Subject: [PATCH 1/3] manually backport and resolve conflicts --- .../src/airflow/metrics/otel_logger.py | 13 +++ airflow-core/src/airflow/stats.py | 28 +++++ .../tests/integration/otel/test_otel.py | 104 ++++++++++++++++++ .../tests/unit/core/test_otel_logger.py | 45 ++++++++ .../src/tests_common/test_utils/otel_utils.py | 68 +++++++++--- 5 files changed, 241 insertions(+), 17 deletions(-) diff --git a/airflow-core/src/airflow/metrics/otel_logger.py b/airflow-core/src/airflow/metrics/otel_logger.py index 317b70a5ca29d..6b75428529c02 100644 --- a/airflow-core/src/airflow/metrics/otel_logger.py +++ b/airflow-core/src/airflow/metrics/otel_logger.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import atexit import datetime import logging import random @@ -370,6 +371,15 @@ def set_gauge_value(self, name: str, value: int | float, delta: bool, tags: Attr self.map[key].set_value(value, delta) +def flush_otel_metrics(): + provider = metrics.get_meter_provider() + provider.force_flush() + + +def atexit_register_metrics_flush(): + atexit.register(flush_otel_metrics) + + def get_otel_logger(cls) -> SafeOtelLogger: host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector" port = conf.getint("metrics", "otel_port") # ex: 4318 @@ -405,4 +415,7 @@ def get_otel_logger(cls) -> SafeOtelLogger: ), ) + # Register a hook that flushes any in-memory metrics at shutdown. + atexit_register_metrics_flush() + return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator()) diff --git a/airflow-core/src/airflow/stats.py b/airflow-core/src/airflow/stats.py index 6cb9229ab7388..72e86e5239a1b 100644 --- a/airflow-core/src/airflow/stats.py +++ b/airflow-core/src/airflow/stats.py @@ -18,6 +18,7 @@ from __future__ import annotations import logging +import os import socket from collections.abc import Callable from typing import TYPE_CHECKING @@ -36,12 +37,39 @@ class _Stats(type): instance: StatsLogger | NoStatsLogger | None = None def __getattr__(cls, name: str) -> str: + # When using OpenTelemetry, some subprocesses are short-lived and + # often exit before flushing any metrics. + # + # The solution is to register a hook that performs a force flush at exit. + # The atexit hook is registered when initializing the instance. + # + # The instance gets initialized once per process. In case a process is forked, then + # the new subprocess, will inherit the already initialized instance of the parent process. + # + # Store the instance pid so that it can be compared with the current pid + # to decide whether to initialize the instance again or not. + # + # So far, all forks are resetting their state to remove anything inherited by the parent. + # But in the future that might not always be true. + current_pid = os.getpid() + if cls.instance and cls._instance_pid != current_pid: + log.info( + "Stats instance was created in PID %s but accessed in PID %s. Re-initializing.", + cls._instance_pid, + current_pid, + ) + # Setting the instance to None, will force re-initialization. + cls.instance = None + cls._instance_pid = None + if not cls.instance: try: cls.instance = cls.factory() + cls._instance_pid = current_pid except (socket.gaierror, ImportError) as e: log.error("Could not configure StatsClient: %s, using NoStatsLogger instead.", e) cls.instance = NoStatsLogger() + cls._instance_pid = current_pid return getattr(cls.instance, name) def __init__(cls, *args, **kwargs) -> None: diff --git a/airflow-core/tests/integration/otel/test_otel.py b/airflow-core/tests/integration/otel/test_otel.py index 51f70c5129161..944f350c6e16b 100644 --- a/airflow-core/tests/integration/otel/test_otel.py +++ b/airflow-core/tests/integration/otel/test_otel.py @@ -45,6 +45,7 @@ assert_span_name_belongs_to_root_span, assert_span_not_in_children_spans, dump_airflow_metadata_db, + extract_metrics_from_output, extract_spans_from_output, get_parent_child_dict, ) @@ -193,6 +194,15 @@ def check_ti_state_and_span_status(task_id: str, run_id: str, state: str, span_s ) +def check_metrics_exist(output: str, metrics_to_check: list[str]): + # Get a list of lines from the captured output. + output_lines = output.splitlines() + + metrics_dict = extract_metrics_from_output(output_lines) + + assert set(metrics_to_check).issubset(metrics_dict.keys()) + + def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1: bool = True): # Get a list of lines from the captured output. output_lines = output.splitlines() @@ -769,6 +779,100 @@ def cleanup_control_file_if_needed(self): except Exception as ex: log.error("Could not delete leftover control file '%s', error: '%s'.", self.control_file, ex) + def dag_execution_for_testing_metrics(self, capfd): + # Metrics. + os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True" + os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector" + os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318" + os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000" + + if self.use_otel != "true": + os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True" + + celery_worker_process = None + scheduler_process = None + apiserver_process = None + try: + # Start the processes here and not as fixtures or in a common setup, + # so that the test can capture their output. + celery_worker_process, scheduler_process, apiserver_process = self.start_worker_and_scheduler1() + + dag_id = "otel_test_dag" + + assert len(self.dags) > 0 + dag = self.dags[dag_id] + + assert dag is not None + + run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id) + + # Skip the span_status check. + wait_for_dag_run_and_check_span_status( + dag_id=dag_id, run_id=run_id, max_wait_time=90, span_status=None + ) + + # The ti span_status is updated while processing the executor events, + # which is after the dag_run state has been updated. + time.sleep(10) + + task_dict = dag.task_dict + task_dict_ids = task_dict.keys() + + for task_id in task_dict_ids: + # Skip the span_status check. + check_ti_state_and_span_status( + task_id=task_id, run_id=run_id, state=State.SUCCESS, span_status=None + ) + + print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id) + finally: + # Terminate the processes. + celery_worker_process.terminate() + celery_worker_process.wait() + + celery_status = celery_worker_process.poll() + assert celery_status is not None, ( + "The celery worker process status is None, which means that it hasn't terminated as expected." + ) + + scheduler_process.terminate() + scheduler_process.wait() + + scheduler_status = scheduler_process.poll() + assert scheduler_status is not None, ( + "The scheduler_1 process status is None, which means that it hasn't terminated as expected." + ) + + apiserver_process.terminate() + apiserver_process.wait() + + apiserver_status = apiserver_process.poll() + assert apiserver_status is not None, ( + "The apiserver process status is None, which means that it hasn't terminated as expected." + ) + + out, err = capfd.readouterr() + log.info("out-start --\n%s\n-- out-end", out) + log.info("err-start --\n%s\n-- err-end", err) + + return out, dag + + def test_export_metrics_during_process_shutdown( + self, monkeypatch, celery_worker_env_vars, capfd, session + ): + out, dag = self.dag_execution_for_testing_metrics(capfd) + + if self.use_otel != "true": + # Test the metrics from the output. + metrics_to_check = [ + "airflow.ti_successes", + "airflow.operator_successes", + "airflow.executor.running_tasks", + "airflow.executor.queued_tasks", + "airflow.executor.open_slots", + ] + check_metrics_exist(output=out, metrics_to_check=metrics_to_check) + @pytest.mark.execution_timeout(90) def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars, capfd, session): """The same scheduler will start and finish the dag processing.""" diff --git a/airflow-core/tests/unit/core/test_otel_logger.py b/airflow-core/tests/unit/core/test_otel_logger.py index 43d30ff9d0703..5bed56164b9da 100644 --- a/airflow-core/tests/unit/core/test_otel_logger.py +++ b/airflow-core/tests/unit/core/test_otel_logger.py @@ -17,6 +17,9 @@ from __future__ import annotations import logging +import os +import subprocess +import sys import time from unittest import mock @@ -32,8 +35,12 @@ _generate_key_name, _is_up_down_counter, full_name, + get_otel_logger, ) from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES, MetricNameLengthExemptionWarning +from airflow.stats import Stats + +from tests_common.test_utils.config import conf_vars INVALID_STAT_NAME_CASES = [ (None, "can not be None"), @@ -302,3 +309,41 @@ def test_timer_start_and_stop_manually_send_true(self, mock_time, name): assert timer.duration == expected_value assert mock_time.call_count == 2 self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name)) + + def test_atexit_flush_on_process_exit(self): + """ + Run a process that initializes a logger, creates a stat and then exits. + + The logger initialization registers an atexit hook. + Test that the hook runs and flushes the created stat at shutdown. + """ + test_module_name = "unit.core.test_otel_logger" + function_call_str = f"import {test_module_name} as m; m.mock_service_run()" + + proc = subprocess.run( + [sys.executable, "-c", function_call_str], + check=False, + env=os.environ.copy(), + capture_output=True, + text=True, + timeout=20, + ) + + assert proc.returncode == 0, f"Process failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}" + + assert "my_test_stat" in proc.stdout, ( + "Expected the metric name to be present in the stdout but it wasn't.\n" + f"stdout:\n{proc.stdout}\n" + f"stderr:\n{proc.stderr}" + ) + + +def mock_service_run(): + with conf_vars( + { + ("metrics", "otel_on"): "True", + ("metrics", "otel_debugging_on"): "True", + } + ): + logger = get_otel_logger(Stats) + logger.incr("my_test_stat") diff --git a/devel-common/src/tests_common/test_utils/otel_utils.py b/devel-common/src/tests_common/test_utils/otel_utils.py index 3e8e99ac5bf1f..a2f53b4738170 100644 --- a/devel-common/src/tests_common/test_utils/otel_utils.py +++ b/devel-common/src/tests_common/test_utils/otel_utils.py @@ -19,6 +19,8 @@ import json import logging import pprint +from collections import defaultdict +from typing import Literal from sqlalchemy import inspect @@ -94,14 +96,21 @@ def clean_task_lines(lines: list) -> list: return cleaned_lines -def extract_spans_from_output(output_lines: list): +def _extract_obj_from_output(output_lines: list[str], kind: Literal["spans"] | Literal["metrics"]): """ - For a given list of ConsoleSpanExporter output lines, it extracts the json spans and creates two dictionaries. + Used to extract spans or metrics from the output. - :return: root spans dict (key: root_span_id - value: root_span), spans dict (key: span_id - value: span) + Parameters + ---------- + :param output_lines: The captured stdout split into lines. + :param kind: Which json type to extract from the output. """ + assert kind in ("spans", "metrics") + span_dict = {} root_span_dict = {} + metric_dict: dict[str, list[dict]] = defaultdict(list) + total_lines = len(output_lines) index = 0 output_lines = clean_task_lines(output_lines) @@ -133,23 +142,48 @@ def extract_spans_from_output(output_lines: list): # Create a formatted json string and then convert the string to a python dict. json_str = "\n".join(json_lines) try: - span = json.loads(json_str) - span_id = span["context"]["span_id"] - span_dict[span_id] = span - - if span["parent_id"] is None: - # This is a root span, add it to the root_span_map as well. - root_span_id = span["context"]["span_id"] - root_span_dict[root_span_id] = span - - except json.JSONDecodeError as e: - log.error("Failed to parse JSON span: %s", e) - log.error("Failed JSON string:") - log.error(json_str) + obj = json.loads(json_str) + except json.JSONDecodeError: + log.error("Failed to parse JSON: %s", json_str) + index += 1 + continue + + if kind == "spans": + if "context" not in obj or "resource_metrics" in obj: + index += 1 + continue + span_id = obj["context"]["span_id"] + span_dict[span_id] = obj + if obj["parent_id"] is None: + root_span_dict[span_id] = obj + else: # kind == "metrics" + if "resource_metrics" not in obj: + index += 1 + continue + for res in obj["resource_metrics"]: + for scope in res.get("scope_metrics", []): + for metric in scope.get("metrics", []): + metric_dict[metric["name"]].append(metric) + else: index += 1 - return root_span_dict, span_dict + return (root_span_dict, span_dict) if kind == "spans" else metric_dict + + +def extract_spans_from_output(output_lines: list): + """ + For a given list of output lines, it extracts the json spans and creates two dictionaries. + + :return: root spans dict (key: root_span_id - value: root_span), spans dict (key: span_id - value: span) + """ + return _extract_obj_from_output(output_lines, "spans") + + +def extract_metrics_from_output(output_lines: list): + """For a given list of output lines, it extracts the json metrics and creates a dictionary.""" + + return _extract_obj_from_output(output_lines, "metrics") def get_id_for_a_given_name(span_dict: dict, span_name: str): From b656279140d9c5cd692e6360d67a690f1e1e80a4 Mon Sep 17 00:00:00 2001 From: Christos Bisias Date: Sun, 15 Feb 2026 16:46:30 +0200 Subject: [PATCH 2/3] fix mypy-airflow-core error --- airflow-core/src/airflow/stats.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/stats.py b/airflow-core/src/airflow/stats.py index 72e86e5239a1b..50d46231fd2b0 100644 --- a/airflow-core/src/airflow/stats.py +++ b/airflow-core/src/airflow/stats.py @@ -35,6 +35,7 @@ class _Stats(type): factory: Callable instance: StatsLogger | NoStatsLogger | None = None + _instance_pid: int | None = None def __getattr__(cls, name: str) -> str: # When using OpenTelemetry, some subprocesses are short-lived and From 13a2218e2bdec6fe161450e5f645d3856a5aa236 Mon Sep 17 00:00:00 2001 From: Christos Bisias Date: Mon, 16 Feb 2026 10:41:54 +0200 Subject: [PATCH 3/3] make package unit.core importable for the test subprocess --- airflow-core/tests/unit/core/test_otel_logger.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/airflow-core/tests/unit/core/test_otel_logger.py b/airflow-core/tests/unit/core/test_otel_logger.py index 5bed56164b9da..db08fbd204e99 100644 --- a/airflow-core/tests/unit/core/test_otel_logger.py +++ b/airflow-core/tests/unit/core/test_otel_logger.py @@ -18,6 +18,7 @@ import logging import os +import pathlib import subprocess import sys import time @@ -320,10 +321,19 @@ def test_atexit_flush_on_process_exit(self): test_module_name = "unit.core.test_otel_logger" function_call_str = f"import {test_module_name} as m; m.mock_service_run()" + # pytest adds 'airflow-core/tests' to the path and makes the package 'unit.core' importable. + # The subprocess doesn't inherit it, and in order to make the package importable, + # the current tests directory, needs to be injected into the 'PYTHONPATH'. + # + # Get 'airflow-core/tests' and add it to the env copy that is passed to the subprocess. + tests_dir = str(pathlib.Path(__file__).resolve().parents[2]) + current_env = os.environ.copy() + current_env["PYTHONPATH"] = tests_dir + os.pathsep + current_env.get("PYTHONPATH", "") + proc = subprocess.run( [sys.executable, "-c", function_call_str], check=False, - env=os.environ.copy(), + env=current_env, capture_output=True, text=True, timeout=20,