feat: add periodic WARNING metrics to assist in debugging#12976
feat: add periodic WARNING metrics to assist in debugging#12976agrawal-siddharth wants to merge 1 commit intogoogleapis:mainfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a HealthCheckMetrics system within ConnectionWorker to monitor and log stream health, including latency, retry counts, and error responses. It also adds corresponding unit tests to verify metric gathering and threshold logic. The review feedback highlights several critical thread-safety concerns, recommending synchronization for metric updates and ensuring that shared state, such as the retry count and request queues, is accessed within existing locks to prevent race conditions. Additionally, there are suggestions to improve resource efficiency by caching the Gson instance and declaring constants as static final.
| void updateRequestsSent(long bytes) { | ||
| windowedRequestsSent++; | ||
| windowedRequestsSentBytes += bytes; | ||
| } | ||
|
|
||
| void updateResponsesAcked(long bytes, double latencyMilli, int code) { | ||
| windowedResponsesAcked++; | ||
| windowedResponsesAckedBytes += bytes; | ||
| if (latencyMilli > windowedMilliLatencyMax) { | ||
| windowedMilliLatencyMax = latencyMilli; | ||
| } | ||
| windowedMilliLatencySum += latencyMilli; | ||
| windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1); | ||
| } |
There was a problem hiding this comment.
The methods updateRequestsSent and updateResponsesAcked must be synchronized to ensure thread safety and visibility. Since gatherHealthCheckMetrics is synchronized and accesses these fields, the update methods must use the same monitor. updateResponsesAcked is particularly critical because it modifies a non-thread-safe HashMap (windowedResponseCodes) and multiple counters, which can be called concurrently from gRPC callback threads.
| void updateRequestsSent(long bytes) { | |
| windowedRequestsSent++; | |
| windowedRequestsSentBytes += bytes; | |
| } | |
| void updateResponsesAcked(long bytes, double latencyMilli, int code) { | |
| windowedResponsesAcked++; | |
| windowedResponsesAckedBytes += bytes; | |
| if (latencyMilli > windowedMilliLatencyMax) { | |
| windowedMilliLatencyMax = latencyMilli; | |
| } | |
| windowedMilliLatencySum += latencyMilli; | |
| windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1); | |
| } | |
| synchronized void updateRequestsSent(long bytes) { | |
| windowedRequestsSent++; | |
| windowedRequestsSentBytes += bytes; | |
| } | |
| synchronized void updateResponsesAcked(long bytes, double latencyMilli, int code) { | |
| windowedResponsesAcked++; | |
| windowedResponsesAckedBytes += bytes; | |
| if (latencyMilli > windowedMilliLatencyMax) { | |
| windowedMilliLatencyMax = latencyMilli; | |
| } | |
| windowedMilliLatencySum += latencyMilli; | |
| windowedResponseCodes.put(code, windowedResponseCodes.getOrDefault(code, 0) + 1); | |
| } |
References
- Ensure that counters or shared state accessed concurrently by multiple threads are thread-safe, either by using Atomic types or by protecting access with a lock.
| HealthCheckMetrics.HealthCheckFields gatherHealthCheckMetrics() { | ||
| HealthCheckFields healthCheckFields = healthCheckMetrics.new HealthCheckFields(); | ||
| healthCheckFields.responseCodes = new HashMap<>(); | ||
| healthCheckMetrics.gatherHealthCheckMetrics(healthCheckFields); | ||
| healthCheckMetrics.emitHealthCheckMetrics(healthCheckFields); | ||
| return healthCheckFields; | ||
| } |
There was a problem hiding this comment.
This method accesses ConnectionWorker fields (like waitingRequestQueue and inflightRequestQueue) that are guarded by this.lock. It must acquire the lock before calling gatherHealthCheckMetrics to avoid race conditions and potential ConcurrentModificationException during tests.
HealthCheckMetrics.HealthCheckFields gatherHealthCheckMetrics() {
this.lock.lock();
try {
HealthCheckFields healthCheckFields = healthCheckMetrics.new HealthCheckFields();
healthCheckFields.responseCodes = new HashMap<>();
healthCheckMetrics.gatherHealthCheckMetrics(healthCheckFields);
healthCheckMetrics.emitHealthCheckMetrics(healthCheckFields);
return healthCheckFields;
} finally {
this.lock.unlock();
}
}References
- Ensure that access to shared state is protected by a lock when accessed concurrently by multiple threads.
| inflightRetryCount.set(0L); | ||
| this.lock.lock(); |
There was a problem hiding this comment.
inflightRetryCount.set(0L) should be moved inside the lock. This prevents a race condition where a concurrent requestCallback decrements the counter after it has been reset to 0 by the cleanup thread, leading to a negative value.
| inflightRetryCount.set(0L); | |
| this.lock.lock(); | |
| this.lock.lock(); | |
| inflightRetryCount.set(0L); | |
| try { |
References
- Ensure that access to shared state is protected by a lock when accessed concurrently by multiple threads.
| if (requestWrapper.retryCount > 0) { | ||
| inflightRetryCount.decrementAndGet(); | ||
| } |
There was a problem hiding this comment.
The decrement of inflightRetryCount should be performed while holding this.lock (and should also be handled in early return paths within requestCallback). This ensures consistency with the inflightRequestQueue state and prevents race conditions with cleanupInflightRequests.
References
- Ensure that access to shared state is protected by a lock when accessed concurrently by multiple threads.
| // Latency buckets are based on a list of 1.5 ^ n | ||
| class HealthCheckMetrics { | ||
| // Interval between health checks. | ||
| private Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15); |
There was a problem hiding this comment.
HEALTH_CHECK_INTERVAL should be static final as it is a constant. Additionally, to improve resource efficiency, cache and share a Gson instance instead of creating a new one repeatedly in emitHealthCheckMetrics.
| private Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15); | |
| private static final Duration HEALTH_CHECK_INTERVAL = Duration.ofSeconds(15); | |
| private static final Gson GSON = new Gson(); |
References
- To improve resource efficiency, cache and share resource-intensive clients or objects instead of creating a new instance for every call.
| Gson gson = new Gson(); | ||
| log.warning(gson.toJson(healthCheckFields)); |
There was a problem hiding this comment.
Use a reusable GSON instance instead of creating a new one on every call to improve efficiency.
| Gson gson = new Gson(); | |
| log.warning(gson.toJson(healthCheckFields)); | |
| log.warning(GSON.toJson(healthCheckFields)); |
References
- To improve resource efficiency, cache and share resource-intensive clients or objects instead of creating a new instance for every call.
8d4cef8 to
f2bf77d
Compare
f2bf77d to
4b9d050
Compare
| @@ -249,6 +257,7 @@ class ConnectionWorker implements AutoCloseable { | |||
|
|
|||
| private final RequestProfiler.RequestProfilerHook requestProfilerHook; | |||
| private final TelemetryMetrics telemetryMetrics; | |||
| private final HealthCheckMetrics healthCheckMetrics; | |||
There was a problem hiding this comment.
can we mark the whole healthCheckMetrics guarded by lock ?
| /* | ||
| * Dump health check metrics as WARNING log. | ||
| */ | ||
| private void emitHealthCheckMetrics(HealthCheckFields healthCheckFields) { |
There was a problem hiding this comment.
why do we only expose three metrics here?
No description provided.