Skip to content

feat: add periodic WARNING metrics to assist in debugging#12976

Open
agrawal-siddharth wants to merge 1 commit intogoogleapis:mainfrom
agrawal-siddharth:dump_warning
Open

feat: add periodic WARNING metrics to assist in debugging#12976
agrawal-siddharth wants to merge 1 commit intogoogleapis:mainfrom
agrawal-siddharth:dump_warning

Conversation

@agrawal-siddharth
Copy link
Copy Markdown
Contributor

No description provided.

@agrawal-siddharth agrawal-siddharth requested review from a team as code owners May 1, 2026 01:56
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a HealthCheckMetrics system within ConnectionWorker to monitor and log stream health, including latency, retry counts, and error responses. It also adds corresponding unit tests to verify metric gathering and threshold logic. The review feedback highlights several critical thread-safety concerns, recommending synchronization for metric updates and ensuring that shared state, such as the retry count and request queues, is accessed within existing locks to prevent race conditions. Additionally, there are suggestions to improve resource efficiency by caching the Gson instance and declaring constants as static final.

Comment on lines +316 to +329
void updateRequestsSent(long bytes) {
windowedRequestsSent++;
windowedRequestsSentBytes += bytes;
}

void updateResponsesAcked(long bytes, double latencyMilli, int code) {
windowedResponsesAcked++;
windowedResponsesAckedBytes += bytes;
if (latencyMilli > windowedMilliLatencyMax) {
windowedMilliLatencyMax = latencyMilli;
}
windowedMilliLatencySum += latencyMilli;
windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The methods updateRequestsSent and updateResponsesAcked must be synchronized to ensure thread safety and visibility. Since gatherHealthCheckMetrics is synchronized and accesses these fields, the update methods must use the same monitor. updateResponsesAcked is particularly critical because it modifies a non-thread-safe HashMap (windowedResponseCodes) and multiple counters, which can be called concurrently from gRPC callback threads.

Suggested change
void updateRequestsSent(long bytes) {
windowedRequestsSent++;
windowedRequestsSentBytes += bytes;
}
void updateResponsesAcked(long bytes, double latencyMilli, int code) {
windowedResponsesAcked++;
windowedResponsesAckedBytes += bytes;
if (latencyMilli > windowedMilliLatencyMax) {
windowedMilliLatencyMax = latencyMilli;
}
windowedMilliLatencySum += latencyMilli;
windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1);
}
synchronized void updateRequestsSent(long bytes) {
windowedRequestsSent++;
windowedRequestsSentBytes += bytes;
}
synchronized void updateResponsesAcked(long bytes, double latencyMilli, int code) {
windowedResponsesAcked++;
windowedResponsesAckedBytes += bytes;
if (latencyMilli > windowedMilliLatencyMax) {
windowedMilliLatencyMax = latencyMilli;
}
windowedMilliLatencySum += latencyMilli;
windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1);
}
References
  1. Ensure that counters or shared state accessed concurrently by multiple threads are thread-safe, either by using Atomic types or by protecting access with a lock.

Comment on lines +976 to +982
HealthCheckMetrics.HealthCheckFields gatherHealthCheckMetrics() {
HealthCheckFields healthCheckFields = healthCheckMetrics.new HealthCheckFields();
healthCheckFields.responseCodes = new HashMap<>();
healthCheckMetrics.gatherHealthCheckMetrics(healthCheckFields);
healthCheckMetrics.emitHealthCheckMetrics(healthCheckFields);
return healthCheckFields;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This method accesses ConnectionWorker fields (like waitingRequestQueue and inflightRequestQueue) that are guarded by this.lock. It must acquire the lock before calling gatherHealthCheckMetrics to avoid race conditions and potential ConcurrentModificationException during tests.

  HealthCheckMetrics.HealthCheckFields gatherHealthCheckMetrics() {
    this.lock.lock();
    try {
      HealthCheckFields healthCheckFields = healthCheckMetrics.new HealthCheckFields();
      healthCheckFields.responseCodes = new HashMap<>();
      healthCheckMetrics.gatherHealthCheckMetrics(healthCheckFields);
      healthCheckMetrics.emitHealthCheckMetrics(healthCheckFields);
      return healthCheckFields;
    } finally {
      this.lock.unlock();
    }
  }
References
  1. Ensure that access to shared state is protected by a lock when accessed concurrently by multiple threads.

Comment on lines 1357 to 1358
inflightRetryCount.set(0L);
this.lock.lock();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

inflightRetryCount.set(0L) should be moved inside the lock. This prevents a race condition where a concurrent requestCallback decrements the counter after it has been reset to 0 by the cleanup thread, leading to a negative value.

Suggested change
inflightRetryCount.set(0L);
this.lock.lock();
this.lock.lock();
inflightRetryCount.set(0L);
try {
References
  1. Ensure that access to shared state is protected by a lock when accessed concurrently by multiple threads.

Comment on lines +1584 to +1586
if (requestWrapper.retryCount > 0) {
inflightRetryCount.decrementAndGet();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The decrement of inflightRetryCount should be performed while holding this.lock (and should also be handled in early return paths within requestCallback). This ensures consistency with the inflightRequestQueue state and prevents race conditions with cleanupInflightRequests.

References
  1. Ensure that access to shared state is protected by a lock when accessed concurrently by multiple threads.

// Latency buckets are based on a list of 1.5 ^ n
class HealthCheckMetrics {
// Interval between health checks.
private Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

HEALTH_CHECK_INTERVAL should be static final as it is a constant. Additionally, to improve resource efficiency, cache and share a Gson instance instead of creating a new one repeatedly in emitHealthCheckMetrics.

Suggested change
private Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15);
private static final Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15);
private static final Gson GSON = new Gson();
References
  1. To improve resource efficiency, cache and share resource-intensive clients or objects instead of creating a new instance for every call.

Comment on lines +435 to +436
Gson gson = new Gson();
log.warning(gson.toJson(healthCheckFields));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Use a reusable GSON instance instead of creating a new one on every call to improve efficiency.

Suggested change
Gson gson = new Gson();
log.warning(gson.toJson(healthCheckFields));
log.warning(GSON.toJson(healthCheckFields));
References
  1. To improve resource efficiency, cache and share resource-intensive clients or objects instead of creating a new instance for every call.

@@ -249,6 +257,7 @@ class ConnectionWorker implements AutoCloseable {

private final RequestProfiler.RequestProfilerHook requestProfilerHook;
private final TelemetryMetrics telemetryMetrics;
private final HealthCheckMetrics healthCheckMetrics;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we mark the whole healthCheckMetrics guarded by lock ?

/*
* Dump health check metrics as WARNING log.
*/
private void emitHealthCheckMetrics(HealthCheckFields healthCheckFields) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we only expose three metrics here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants