Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ def is_effective_leaf(task):
return leaf_tis

def _emit_dagrun_span(self, state: DagRunState):
ctx = TraceContextTextMapPropagator().extract(self.context_carrier)
ctx = TraceContextTextMapPropagator().extract(self.context_carrier or {})
span = trace.get_current_span(context=ctx)
span_context = span.get_span_context()
with override_ids(span_context.trace_id, span_context.span_id):
Expand Down
39 changes: 39 additions & 0 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -3518,3 +3518,42 @@ def test_emit_dagrun_span_attributes_and_status(self, dag_maker, session, final_

expected_status = StatusCode.OK if final_state == DagRunState.SUCCESS else StatusCode.ERROR
assert span.status.status_code == expected_status

@pytest.mark.parametrize("carrier_value", [None, {}])
def test_emit_dagrun_span_with_none_or_empty_carrier(self, dag_maker, session, carrier_value):
"""_emit_dagrun_span should emit a root span when context_carrier is None or empty.

This happens for DagRuns created before OTel tracing was enabled, or whose
context_carrier was cleared/backfilled to NULL. Per OTel spec, missing context
results in a new root span rather than a crash.
"""
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

from airflow._shared.observability.traces import OverrideableRandomIdGenerator

in_mem_exporter = InMemorySpanExporter()
provider = TracerProvider(id_generator=OverrideableRandomIdGenerator())
provider.add_span_processor(SimpleSpanProcessor(in_mem_exporter))
test_tracer = provider.get_tracer("test")

with dag_maker("test_tracing_none_carrier", session=session) as dag:
EmptyOperator(task_id="t1")

dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
ti = dr.get_task_instance("t1", session=session)
ti.state = TaskInstanceState.SUCCESS
session.flush()
dr.dag = dag

# Simulate a DagRun with missing context_carrier
dr.context_carrier = carrier_value

with mock.patch("airflow.models.dagrun.tracer", test_tracer):
dr.update_state(session=session)

# A root span should still be emitted
spans = in_mem_exporter.get_finished_spans()
assert len(spans) == 1
assert spans[0].name == f"dag_run.{dr.dag_id}"
Loading