diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx index dd7a7a45d3bc7..d198a0c7f4948 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/AssetSchedule.tsx @@ -23,7 +23,7 @@ import { useTranslation } from "react-i18next"; import { FiDatabase } from "react-icons/fi"; import { Link as RouterLink } from "react-router-dom"; -import { useAssetServiceNextRunAssets } from "openapi/queries"; +import { useAssetServiceGetDagAssetQueuedEvents, useAssetServiceNextRunAssets } from "openapi/queries"; import { AssetExpression, type ExpressionType } from "src/components/AssetExpression"; import type { NextRunEvent } from "src/components/AssetExpression/types"; import { TruncatedText } from "src/components/TruncatedText"; @@ -61,22 +61,52 @@ const PartitionSchedule = ({ dagId, isLoading, pendingCount }: PartitionSchedule export const AssetSchedule = ({ assetExpression, dagId, latestRunAfter, timetableSummary }: Props) => { const { t: translate } = useTranslation(["dags", "common"]); - const { data: nextRun, isLoading } = useAssetServiceNextRunAssets({ dagId }); const isPartitioned = timetableSummary === "Partitioned Asset"; + const { data: nextRun, isLoading: isNextRunLoading } = useAssetServiceNextRunAssets({ dagId }); + const { + data: queuedEventsData, + error: queuedEventsError, + isLoading: isQueuedEventsLoading, + } = useAssetServiceGetDagAssetQueuedEvents({ dagId }, undefined, { enabled: !isPartitioned }); const nextRunEvents = (nextRun?.events ?? []) as Array; - - const pendingEvents = nextRunEvents.filter((ev) => { - if (ev.lastUpdate === null) { - return false; + const queuedEventsErrorStatus = + typeof queuedEventsError === "object" && queuedEventsError !== null && "status" in queuedEventsError + ? (queuedEventsError as { status?: number }).status + : undefined; + const hasQueuedEventsError = Boolean(queuedEventsError) && queuedEventsErrorStatus !== 404; + const queuedAssetEvents = new Map(); + + if (!isPartitioned && !hasQueuedEventsError) { + for (const event of queuedEventsData?.queued_events ?? []) { + // Keep a single event timestamp per asset, using the latest one when duplicates exist. + const existingEventDate = queuedAssetEvents.get(event.asset_id); + + if (existingEventDate === undefined || dayjs(event.created_at).isAfter(existingEventDate)) { + queuedAssetEvents.set(event.asset_id, event.created_at); + } } + } + + const pendingEvents = nextRunEvents.flatMap((event) => { if (isPartitioned) { - return true; + return event.lastUpdate === null ? [] : [event]; } - return latestRunAfter !== undefined && dayjs(ev.lastUpdate).isAfter(latestRunAfter); + if (hasQueuedEventsError) { + if (event.lastUpdate === null) { + return []; + } + + return latestRunAfter !== undefined && dayjs(event.lastUpdate).isAfter(latestRunAfter) ? [event] : []; + } + + const queuedAt = queuedAssetEvents.get(event.id); + + return queuedAt === undefined ? [] : [{ ...event, lastUpdate: event.lastUpdate ?? queuedAt }]; }); + const isLoading = isNextRunLoading || (!isPartitioned && isQueuedEventsLoading); if (!nextRunEvents.length) { return (