Don't abort kafka_consumer when one partition's high-watermark lookup fails#24263
Draft
piochelepiotr wants to merge 5 commits into
Draft
Don't abort kafka_consumer when one partition's high-watermark lookup fails#24263piochelepiotr wants to merge 5 commits into
piochelepiotr wants to merge 5 commits into
Conversation
… fails The high-watermark offset collection issues a single batched offsets_for_times call for all topic-partitions. If any partition cannot be resolved at all (e.g. a leaderless/offline RF-0 or mid-deletion topic), librdkafka raises a KafkaException (UNKNOWN_TOPIC_OR_PART) for the entire batch call, which propagates up and aborts the whole check so no metrics are collected. Fall back to per-partition lookups when the batched call fails, skipping only the partitions that still raise, so healthy partitions are still collected and the check is not aborted. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
🎉 All green!🧪 All tests passed 🎯 Code Coverage (details) 🔗 Commit SHA: d6807c7 | Docs | Datadog PR Page | Give us feedback! |
Replace the resilient-but-clunky offsets_for_times approach with AdminClient.list_offsets, which returns a future per partition. Errored partitions (e.g. leaderless/offline topics that raise UNKNOWN_TOPIC_OR_PART) are skipped individually, so healthy partitions are still collected without a wholesale raise or a per-partition retry loop. list_offsets is also the purpose-built API for latest/earliest offsets, unlike offsets_for_times with sentinel timestamp values. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Mirror cluster_metadata.fetch_earliest_offsets in get_partition_offsets: broaden the per-future handler to catch any Exception so one bad partition does not abort the loop, and wrap the outer list_offsets call so a request/broker-level failure logs a warning and returns [] instead of aborting the whole highwater collection. Strengthen unit tests: assert list_offsets is called with isolation_level=READ_UNCOMMITTED and the request timeout, cover the non-Kafka per-partition error and request-level failure paths, and add an empty-partitions test that issues no request. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
aed9a66 to
d6807c7
Compare
Contributor
Validation ReportAll 21 validations passed. Show details
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What does this PR do?
Fetch
kafka_consumerhigh-watermark offsets viaAdminClient.list_offsets(one future per partition) instead of a single batchedConsumer.offsets_for_times. A partition that can't be resolved (e.g. a leaderless/offline topic) is now skipped individually, instead of raisingUNKNOWN_TOPIC_OR_PARTfor the whole batch and aborting the entire check.Motivation
One broken topic-partition could take the whole integration down — dropping all metrics, not just lag — especially with
enable_cluster_monitoringon and on shared/Confluent Cloud clusters that contain leaderless/RF-0 or mid-deletion topics.Review checklist (to be filled by reviewers)
qa/requiredif this PR needs QA validation, orqa/skip-qaif it does not. Exactly one of the two is required.backport/<branch-name>label to the PR and it will automatically open a backport PR once this one is merged