Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions airflow-core/src/airflow/metrics/otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import atexit
import datetime
import logging
import random
Expand Down Expand Up @@ -370,6 +371,15 @@ def set_gauge_value(self, name: str, value: int | float, delta: bool, tags: Attr
self.map[key].set_value(value, delta)


def flush_otel_metrics():
provider = metrics.get_meter_provider()
provider.force_flush()


def atexit_register_metrics_flush():
atexit.register(flush_otel_metrics)


def get_otel_logger(cls) -> SafeOtelLogger:
host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector"
port = conf.getint("metrics", "otel_port") # ex: 4318
Expand Down Expand Up @@ -405,4 +415,7 @@ def get_otel_logger(cls) -> SafeOtelLogger:
),
)

# Register a hook that flushes any in-memory metrics at shutdown.
atexit_register_metrics_flush()

return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator())
29 changes: 29 additions & 0 deletions airflow-core/src/airflow/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from __future__ import annotations

import logging
import os
import socket
from collections.abc import Callable
from typing import TYPE_CHECKING
Expand All @@ -34,14 +35,42 @@
class _Stats(type):
factory: Callable
instance: StatsLogger | NoStatsLogger | None = None
_instance_pid: int | None = None

def __getattr__(cls, name: str) -> str:
# When using OpenTelemetry, some subprocesses are short-lived and
# often exit before flushing any metrics.
#
# The solution is to register a hook that performs a force flush at exit.
# The atexit hook is registered when initializing the instance.
#
# The instance gets initialized once per process. In case a process is forked, then
# the new subprocess, will inherit the already initialized instance of the parent process.
#
# Store the instance pid so that it can be compared with the current pid
# to decide whether to initialize the instance again or not.
#
# So far, all forks are resetting their state to remove anything inherited by the parent.
# But in the future that might not always be true.
current_pid = os.getpid()
if cls.instance and cls._instance_pid != current_pid:
log.info(
"Stats instance was created in PID %s but accessed in PID %s. Re-initializing.",
cls._instance_pid,
current_pid,
)
# Setting the instance to None, will force re-initialization.
cls.instance = None
cls._instance_pid = None

if not cls.instance:
try:
cls.instance = cls.factory()
cls._instance_pid = current_pid
except (socket.gaierror, ImportError) as e:
log.error("Could not configure StatsClient: %s, using NoStatsLogger instead.", e)
cls.instance = NoStatsLogger()
cls._instance_pid = current_pid
return getattr(cls.instance, name)

def __init__(cls, *args, **kwargs) -> None:
Expand Down
104 changes: 104 additions & 0 deletions airflow-core/tests/integration/otel/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
assert_span_name_belongs_to_root_span,
assert_span_not_in_children_spans,
dump_airflow_metadata_db,
extract_metrics_from_output,
extract_spans_from_output,
get_parent_child_dict,
)
Expand Down Expand Up @@ -193,6 +194,15 @@ def check_ti_state_and_span_status(task_id: str, run_id: str, state: str, span_s
)


def check_metrics_exist(output: str, metrics_to_check: list[str]):
# Get a list of lines from the captured output.
output_lines = output.splitlines()

metrics_dict = extract_metrics_from_output(output_lines)

assert set(metrics_to_check).issubset(metrics_dict.keys())


def check_spans_with_continuance(output: str, dag: DAG, continuance_for_t1: bool = True):
# Get a list of lines from the captured output.
output_lines = output.splitlines()
Expand Down Expand Up @@ -769,6 +779,100 @@ def cleanup_control_file_if_needed(self):
except Exception as ex:
log.error("Could not delete leftover control file '%s', error: '%s'.", self.control_file, ex)

def dag_execution_for_testing_metrics(self, capfd):
# Metrics.
os.environ["AIRFLOW__METRICS__OTEL_ON"] = "True"
os.environ["AIRFLOW__METRICS__OTEL_HOST"] = "breeze-otel-collector"
os.environ["AIRFLOW__METRICS__OTEL_PORT"] = "4318"
os.environ["AIRFLOW__METRICS__OTEL_INTERVAL_MILLISECONDS"] = "5000"

if self.use_otel != "true":
os.environ["AIRFLOW__METRICS__OTEL_DEBUGGING_ON"] = "True"

celery_worker_process = None
scheduler_process = None
apiserver_process = None
try:
# Start the processes here and not as fixtures or in a common setup,
# so that the test can capture their output.
celery_worker_process, scheduler_process, apiserver_process = self.start_worker_and_scheduler1()

dag_id = "otel_test_dag"

assert len(self.dags) > 0
dag = self.dags[dag_id]

assert dag is not None

run_id = unpause_trigger_dag_and_get_run_id(dag_id=dag_id)

# Skip the span_status check.
wait_for_dag_run_and_check_span_status(
dag_id=dag_id, run_id=run_id, max_wait_time=90, span_status=None
)

# The ti span_status is updated while processing the executor events,
# which is after the dag_run state has been updated.
time.sleep(10)

task_dict = dag.task_dict
task_dict_ids = task_dict.keys()

for task_id in task_dict_ids:
# Skip the span_status check.
check_ti_state_and_span_status(
task_id=task_id, run_id=run_id, state=State.SUCCESS, span_status=None
)

print_ti_output_for_dag_run(dag_id=dag_id, run_id=run_id)
finally:
# Terminate the processes.
celery_worker_process.terminate()
celery_worker_process.wait()

celery_status = celery_worker_process.poll()
assert celery_status is not None, (
"The celery worker process status is None, which means that it hasn't terminated as expected."
)

scheduler_process.terminate()
scheduler_process.wait()

scheduler_status = scheduler_process.poll()
assert scheduler_status is not None, (
"The scheduler_1 process status is None, which means that it hasn't terminated as expected."
)

apiserver_process.terminate()
apiserver_process.wait()

apiserver_status = apiserver_process.poll()
assert apiserver_status is not None, (
"The apiserver process status is None, which means that it hasn't terminated as expected."
)

out, err = capfd.readouterr()
log.info("out-start --\n%s\n-- out-end", out)
log.info("err-start --\n%s\n-- err-end", err)

return out, dag

def test_export_metrics_during_process_shutdown(
self, monkeypatch, celery_worker_env_vars, capfd, session
):
out, dag = self.dag_execution_for_testing_metrics(capfd)

if self.use_otel != "true":
# Test the metrics from the output.
metrics_to_check = [
"airflow.ti_successes",
"airflow.operator_successes",
"airflow.executor.running_tasks",
"airflow.executor.queued_tasks",
"airflow.executor.open_slots",
]
check_metrics_exist(output=out, metrics_to_check=metrics_to_check)

@pytest.mark.execution_timeout(90)
def test_dag_execution_succeeds(self, monkeypatch, celery_worker_env_vars, capfd, session):
"""The same scheduler will start and finish the dag processing."""
Expand Down
55 changes: 55 additions & 0 deletions airflow-core/tests/unit/core/test_otel_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
from __future__ import annotations

import logging
import os
import pathlib
import subprocess
import sys
import time
from unittest import mock

Expand All @@ -32,8 +36,12 @@
_generate_key_name,
_is_up_down_counter,
full_name,
get_otel_logger,
)
from airflow.metrics.validators import BACK_COMPAT_METRIC_NAMES, MetricNameLengthExemptionWarning
from airflow.stats import Stats

from tests_common.test_utils.config import conf_vars

INVALID_STAT_NAME_CASES = [
(None, "can not be None"),
Expand Down Expand Up @@ -302,3 +310,50 @@ def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
assert timer.duration == expected_value
assert mock_time.call_count == 2
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))

def test_atexit_flush_on_process_exit(self):
"""
Run a process that initializes a logger, creates a stat and then exits.

The logger initialization registers an atexit hook.
Test that the hook runs and flushes the created stat at shutdown.
"""
test_module_name = "unit.core.test_otel_logger"
function_call_str = f"import {test_module_name} as m; m.mock_service_run()"

# pytest adds 'airflow-core/tests' to the path and makes the package 'unit.core' importable.
# The subprocess doesn't inherit it, and in order to make the package importable,
# the current tests directory, needs to be injected into the 'PYTHONPATH'.
#
# Get 'airflow-core/tests' and add it to the env copy that is passed to the subprocess.
tests_dir = str(pathlib.Path(__file__).resolve().parents[2])
current_env = os.environ.copy()
current_env["PYTHONPATH"] = tests_dir + os.pathsep + current_env.get("PYTHONPATH", "")

proc = subprocess.run(
[sys.executable, "-c", function_call_str],
check=False,
env=current_env,
capture_output=True,
text=True,
timeout=20,
)

assert proc.returncode == 0, f"Process failed\nstdout:\n{proc.stdout}\nstderr:\n{proc.stderr}"

assert "my_test_stat" in proc.stdout, (
"Expected the metric name to be present in the stdout but it wasn't.\n"
f"stdout:\n{proc.stdout}\n"
f"stderr:\n{proc.stderr}"
)


def mock_service_run():
with conf_vars(
{
("metrics", "otel_on"): "True",
("metrics", "otel_debugging_on"): "True",
}
):
logger = get_otel_logger(Stats)
logger.incr("my_test_stat")
68 changes: 51 additions & 17 deletions devel-common/src/tests_common/test_utils/otel_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import json
import logging
import pprint
from collections import defaultdict
from typing import Literal

from sqlalchemy import inspect

Expand Down Expand Up @@ -94,14 +96,21 @@ def clean_task_lines(lines: list) -> list:
return cleaned_lines


def extract_spans_from_output(output_lines: list):
def _extract_obj_from_output(output_lines: list[str], kind: Literal["spans"] | Literal["metrics"]):
"""
For a given list of ConsoleSpanExporter output lines, it extracts the json spans and creates two dictionaries.
Used to extract spans or metrics from the output.

:return: root spans dict (key: root_span_id - value: root_span), spans dict (key: span_id - value: span)
Parameters
----------
:param output_lines: The captured stdout split into lines.
:param kind: Which json type to extract from the output.
"""
assert kind in ("spans", "metrics")

span_dict = {}
root_span_dict = {}
metric_dict: dict[str, list[dict]] = defaultdict(list)

total_lines = len(output_lines)
index = 0
output_lines = clean_task_lines(output_lines)
Expand Down Expand Up @@ -133,23 +142,48 @@ def extract_spans_from_output(output_lines: list):
# Create a formatted json string and then convert the string to a python dict.
json_str = "\n".join(json_lines)
try:
span = json.loads(json_str)
span_id = span["context"]["span_id"]
span_dict[span_id] = span

if span["parent_id"] is None:
# This is a root span, add it to the root_span_map as well.
root_span_id = span["context"]["span_id"]
root_span_dict[root_span_id] = span

except json.JSONDecodeError as e:
log.error("Failed to parse JSON span: %s", e)
log.error("Failed JSON string:")
log.error(json_str)
obj = json.loads(json_str)
except json.JSONDecodeError:
log.error("Failed to parse JSON: %s", json_str)
index += 1
continue

if kind == "spans":
if "context" not in obj or "resource_metrics" in obj:
index += 1
continue
span_id = obj["context"]["span_id"]
span_dict[span_id] = obj
if obj["parent_id"] is None:
root_span_dict[span_id] = obj
else: # kind == "metrics"
if "resource_metrics" not in obj:
index += 1
continue
for res in obj["resource_metrics"]:
for scope in res.get("scope_metrics", []):
for metric in scope.get("metrics", []):
metric_dict[metric["name"]].append(metric)

else:
index += 1

return root_span_dict, span_dict
return (root_span_dict, span_dict) if kind == "spans" else metric_dict


def extract_spans_from_output(output_lines: list):
"""
For a given list of output lines, it extracts the json spans and creates two dictionaries.

:return: root spans dict (key: root_span_id - value: root_span), spans dict (key: span_id - value: span)
"""
return _extract_obj_from_output(output_lines, "spans")


def extract_metrics_from_output(output_lines: list):
"""For a given list of output lines, it extracts the json metrics and creates a dictionary."""

return _extract_obj_from_output(output_lines, "metrics")


def get_id_for_a_given_name(span_dict: dict, span_name: str):
Expand Down