diff --git a/src/ServiceControl.Transports.AzureStorageQueues/QueueLengthProvider.cs b/src/ServiceControl.Transports.AzureStorageQueues/QueueLengthProvider.cs index d11adf7..c8655ef 100644 --- a/src/ServiceControl.Transports.AzureStorageQueues/QueueLengthProvider.cs +++ b/src/ServiceControl.Transports.AzureStorageQueues/QueueLengthProvider.cs @@ -16,9 +16,8 @@ public class QueueLengthProvider : IProvideQueueLength { - ConcurrentDictionary queues = new ConcurrentDictionary(); - ConcurrentDictionary sizes = new ConcurrentDictionary(); - ConcurrentDictionary problematicQueues = new ConcurrentDictionary(); + ConcurrentDictionary queueLengths = new ConcurrentDictionary(); + ConcurrentDictionary problematicQueuesNames = new ConcurrentDictionary(); string connectionString; QueueLengthStore store; @@ -38,19 +37,15 @@ public void Process(EndpointInstanceId endpointInstanceId, EndpointMetadataRepor var queueName = QueueNameSanitizer.Sanitize(metadataReport.LocalAddress); var queueClient = CloudStorageAccount.Parse(connectionString).CreateCloudQueueClient(); - var queue = queueClient.GetQueueReference(queueName); - queues.AddOrUpdate(endpointInputQueue, _ => queue, (_, currentQueue) => + var emptyQueueLength = new QueueLengthValue { - if (currentQueue.Name.Equals(queue.Name) == false) - { - sizes.TryRemove(currentQueue, out var _); - } - - return queue; - }); + QueueName = queueName, + Length = 0, + QueueReference = queueClient.GetQueueReference(queueName) + }; - sizes.TryAdd(queue, 0); + queueLengths.AddOrUpdate(endpointInputQueue, _ => emptyQueueLength, (_, existingQueueLength) => existingQueueLength); } public void Process(EndpointInstanceId endpointInstanceId, TaggedLongValueOccurrence metricsReport) @@ -100,28 +95,31 @@ void UpdateQueueLengthStore() { var nowTicks = DateTime.UtcNow.Ticks; - foreach (var tableNamePair in queues) + foreach (var endpointQueueLengthPair in queueLengths) { - store.Store( - new[]{ new RawMessage.Entry - { - DateTicks = nowTicks, - Value = sizes.TryGetValue(tableNamePair.Value, out var size) ? size : 0 - }}, - tableNamePair.Key); + var queueLengthEntry = new RawMessage.Entry + { + DateTicks = nowTicks, + Value = endpointQueueLengthPair.Value.Length + }; + + store.Store(new[]{ queueLengthEntry }, endpointQueueLengthPair.Key); } } - Task FetchQueueSizes(CancellationToken token) => Task.WhenAll(sizes.Select(kvp => FetchLength(kvp.Key, token))); + Task FetchQueueSizes(CancellationToken token) => Task.WhenAll(queueLengths.Select(kvp => FetchLength(kvp.Value, token))); - async Task FetchLength(CloudQueue queue, CancellationToken token) + async Task FetchLength(QueueLengthValue queueLength, CancellationToken token) { try { - await queue.FetchAttributesAsync(token).ConfigureAwait(false); - sizes[queue] = queue.ApproximateMessageCount.GetValueOrDefault(); + var queueReference = queueLength.QueueReference; + + await queueReference.FetchAttributesAsync(token).ConfigureAwait(false); + + queueLength.Length = queueReference.ApproximateMessageCount.GetValueOrDefault(); - problematicQueues.TryRemove(queue, out _); + problematicQueuesNames.TryRemove(queueLength.QueueName, out _); } catch (OperationCanceledException) { @@ -130,13 +128,20 @@ async Task FetchLength(CloudQueue queue, CancellationToken token) catch (Exception ex) { // simple "log once" approach to do not flood logs - if (problematicQueues.TryAdd(queue, queue)) + if (problematicQueuesNames.TryAdd(queueLength.QueueName, queueLength.QueueName)) { - Logger.Error($"Obtaining Azure Storage Queue count failed for '{queue.Name}'", ex); + Logger.Error($"Obtaining Azure Storage Queue count failed for '{queueLength.QueueName}'", ex); } } } + class QueueLengthValue + { + public string QueueName; + public volatile int Length; + public CloudQueue QueueReference; + } + static TimeSpan QueryDelayInterval = TimeSpan.FromMilliseconds(200); static ILog Logger = LogManager.GetLogger(); }