Open ttvrdon opened 10 months ago
can you share testing code?
using Azure.Identity;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs;
using Azure.Messaging.EventHubs.Consumer;
var ehNamespace = "<EventHub NamespaceName>";
var ehSharedKeyName = "<KeyName>";
var ehSharedKey = "<KeyValue>";
var ehName = "<EventHubName>";
var consumerGroup = "<ConsumerGroupName>";
var storageAccountUrl = "<StorageAccountUrl>";
var storageContainerName = "<ContainerName>";
// EH Client
var ehConsumerClient = new EventHubConsumerClient(consumerGroup, $"Endpoint=sb://{ehNamespace}.servicebus.windows.net/;SharedAccessKeyName={ehSharedKeyName};SharedAccessKey={ehSharedKey};EntityPath={ehName}");
var partitionIds = await ehConsumerClient.GetPartitionIdsAsync();
// Checkpoint blobs from SA
var checkpointBlobs = await GetCheckpointBlobClients(new Uri(storageAccountUrl), storageContainerName, ehNamespace, ehName, consumerGroup);
// Get Unprocessed events count - each 30sec
while (true)
{
var checkpoints = await GetCheckpoints(checkpointBlobs);
long unprocessed = 0;
foreach (var partitionId in partitionIds)
{
var props = await ehConsumerClient.GetPartitionPropertiesAsync(partitionId);
unprocessed += props.LastEnqueuedSequenceNumber - checkpoints[partitionId].sequencenumber;
}
Console.WriteLine($"{DateTime.UtcNow} - Unprocessed: {unprocessed}");
await Task.Delay(TimeSpan.FromSeconds(30));
}
static async Task<Dictionary<string, (long offset, long sequencenumber)>> GetCheckpoints(IList<(string partitionId, BlobClient blobClient)> checkpointBlobClients)
{
var checkpoints = new Dictionary<string, (long offset, long sequencenumber)>();
foreach (var checkpoint in checkpointBlobClients)
{
var props = await checkpoint.blobClient.GetPropertiesAsync();
var offset = long.Parse(props.Value.Metadata["offset"]);
var sequenceNumber = long.Parse(props.Value.Metadata["sequencenumber"]);
checkpoints[checkpoint.partitionId] = (offset, sequenceNumber);
}
return checkpoints;
}
static async Task<IList<(string partitionId, BlobClient blobClient)>> GetCheckpointBlobClients(Uri storageAccountUrl, string containerName, string ehNamespace, string ehName, string consumerGroup)
{
var blobServiceClient = new BlobServiceClient(storageAccountUrl, new DefaultAzureCredential());
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);
var checkpointBlobs = new List<(string partitionId, BlobClient blobClient)>();
await foreach (BlobItem blobItem in containerClient.GetBlobsAsync(prefix: $"{ehNamespace}.servicebus.windows.net/{ehName}/{consumerGroup}/checkpoint"))
{
var partitionId = blobItem.Name.Substring(blobItem.Name.LastIndexOf('/') + 1);
var blobClient = containerClient.GetBlobClient(blobItem.Name);
checkpointBlobs.Add((partitionId, blobClient));
}
return checkpointBlobs;
}
We are seeing the same issue with Redis Streams. App successfully scaled to 10 replicas but didn't scale down after all messages had been ack'd
in our case we screwed up one of the secrets. but without that verbose logging, we had no idea keda was rejected its inputs and scaling out due to that. so i suspect that'll be your issue too, it's just kinda hard to tell what/why without visibility if you're not really careful to inspect every value/secret given to keda via ACA.
I am encountering the same issue. I created the same azure-eventhub
scale rule and used Dapr to process all Event Hubs messages and it did not scale down. I am thinking that the checkpoints are not being shared correctly as the scale down took place once the TTL of the message passed. (Checkpoint settings should be correct for KEDA / Dapr)
Any updates on this?
I am also facing same issue.
I'm facing a similar issue... running 4 container apps with the KEDA azure-eventhub
scale rule and had two of them that were always maxed out, even though the number of messages coming in doesn't reflect the scaling; even when there's nothing coming in, the apps keep scaled at max.
I've reviewed the configuration and found that actually the two apps that were fine, in reality were not properly configured. When the configuration was adjusted, they started suffering from the same issue.
activationUnprocessedEventThreshold: 10
blobContainer: <container_name>
connectionFromEnv: <connection_env>
consumerGroup: <consumer_group>
eventHubNameFromEnv: <hub_name_env>
storageConnectionFromEnv: <storage_connection_env>
unprocessedEventThreshold: 64
Are there any updated on this?
Alright... sorted out my own issue. Looking at the latest version of the scaler (2.14), I've found this new parameter (or at least, I don't remember seeing it before).
checkpointStrategy - configure the checkpoint behaviour of different Event Hub SDKs. (Values: azureFunction, blobMetadata, goSdk, default: "", Optional)
And a bit further it says
When no checkpoint strategy is specified, the Event Hub scaler will use backwards compatibility and able to scale older implementations of C#, Python or Java Event Hub SDKs. (see “Legacy checkpointing”). If this behaviour should be used, blobContainer is also required.
It came as a surprise that the default would be legacy checkpointing, to be honest, I expected the other way around. Nonetheless, after setting this to blobMetadata
to suit my case, the auto-scaler started working.
Hopefully this will help someone in a similar situation.
This issue is a: (mark with an x)
Issue description
AzureContainerApp is processing data from EventHub and is configured as follows:
and is using KEDA Scale Rule of type
azure-eventhub
with following settings:When there was a high count of unprocessed messages, it was scaled to its defined maximum - 10 replicas. However even the unprocessed count is low for long time now, App is not scaling down and stays in 10 replicas.
I created testing code to detect unprocessed count in EventHub, run it every approx. 30 secs (similar interval as scale rule evaluation) with following results:
In the Keda Source code I found that actual metrics are being logged: keda/pkg/scalers/azure_eventhub_scaler.go, lines 389, 396. These logs are not available in Azure Log Analytics. How to enable verbose logging for Keda?
Steps to reproduce
azure-eventhub
scale rule as described aboveExpected behavior When the Unprocessed count will be lower than threshold defined, App should scale back down (gradually down to minimum replica count)
Actual behavior Replica count stays at maximal count.