Azure / azure-functions-kafka-extension

Kafka extension for Azure Functions
MIT License
114 stars 82 forks source link

Azure Functions - Kafka Trigger and Output Binding - Consume exactly-once #408

Open monteiro-alan opened 1 year ago

monteiro-alan commented 1 year ago

I'm testing the Kafka Trigger and Output Binding in Azure Functions to consume a topic and write the message to another topic, very simple code.

But when I enable the auto-scale feature and the function provision new instances, I'm losing the 'exactly-once' feature and apparently some messages are being delivered to more than one instance.

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.6.0, 4.0.0)"
  }
}

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "kafkaTrigger",
      "name": "kafkaTrigger",
      "direction": "in",
      "brokerList": "%PEP_BEES_KAFKA_BOOTSTRAP%",
      "topic": "%PEP_BEES_KAFKA_SOURCE_TOPIC%",
      "consumerGroup": "%PEP_BEES_KAFKA_SOURCE_TOPIC_CONSUMER_GROUP%"
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "kafkaOutput",
      "brokerList": "%PEP_BEES_KAFKA_BOOTSTRAP%",
      "topic": "%PEP_BEES_KAFKA_DESTINATION_TOPIC%"
    }
  ]
}

init.py

import logging, json
from azure.functions import KafkaEvent
import azure.functions as func
import typing

def main(kafkaTrigger : typing.List[KafkaEvent], kafkaOutput: func.Out[str]):

    message = json.loads(str(kafkaTrigger.get_body().decode('utf-8')))

    input_msg = str(message['Value'])

    kafkaOutput.set(input_msg)

As you can see in the image below, the 'test-topic-output' (destination) has more messages than the 'test-topic' (source), indicating that sometimes more than one instance is consuming a message:

image

If I disable the auto-scaling feature, this behavior does not happen.

I just need that the 'exactly-once' feature works even with the function auto-scale enabled, to have an elastic environment.

shrohilla commented 1 year ago

Hi @monteiro-alan , Not sure if you are using the latest NuGet of Kafka extension. For being 100% sure could you please try with the manual install approach as shared in samples

Make the changes in hosts.json and add the extensions.csproj and update the version of Nuget package to 3.6.0

But I have few pointers to validate :-

If the above two issues are also a part of debugging maybe our next release where we are addressing the producer performance will solve the problem.

monteiro-alan commented 1 year ago

Hi @shrohilla, I tried to manual install the Kafka extension, but the behaviour was the same.

I noticed some erros in the application insights, I'm not sure if they can be related with the problem: image Confluent.Kafka.KafkaException: at Confluent.Kafka.Impl.SafeKafkaHandle.StoreOffsets (Confluent.Kafka, Version=1.9.0.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e) at Confluent.Kafka.Consumer2.StoreOffset (Confluent.Kafka, Version=1.9.0.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e) at Microsoft.Azure.WebJobs.Extensions.Kafka.AsyncCommitStrategy2.Commit (Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.6.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35: /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AsyncCommitStrategy.cs:28) at Microsoft.Azure.WebJobs.Extensions.Kafka.FunctionExecutorBase`2.Commit (Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.6.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35: /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs:87)

shrohilla commented 1 year ago

@monteiro-alan could you please try by tuning the value of AutoCommitIntervalMs ??

monteiro-alan commented 1 year ago

@shrohilla , tried with 5ms and 500ms, same problem and errors

shrohilla commented 1 year ago

@monteiro-alan , I had just released 3.7.0 NuGet Package where we fixed the performance. Could you please try that without the AutoCommitIntervalMs changes.

monteiro-alan commented 1 year ago

Hi @shrohilla, now the performance is incredibly better, but still some errors and the duplicates consumption when the Function scale and a Kafka partition rebalance occours: image

Screen Shot 2022-12-14 at 10 20 59

Confluent.Kafka.KafkaException: Local: Erroneous state at Confluent.Kafka.Impl.SafeKafkaHandle.StoreOffsets(IEnumerable1 offsets) at Confluent.Kafka.Consumer2.StoreOffset(TopicPartitionOffset offset) at Microsoft.Azure.WebJobs.Extensions.Kafka.AsyncCommitStrategy2.Commit(IEnumerable1 topicPartitionOffsets) in /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AsyncCommitStrategy.cs:line 28 at Microsoft.Azure.WebJobs.Extensions.Kafka.FunctionExecutorBase2.Commit(IEnumerable1 topicPartitionOffsets) in /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs:line 87

shrohilla commented 1 year ago

@monteiro-alan I would recommend setting the maxBatchSize and add the channel capacity in hosts.json for executing more number of events. Currently not sure but you are executing single event at a time due to which you are facing this issue.

monteiro-alan commented 1 year ago

@shrohilla are there any recommendations on this parameters? I tried using the batch mode and in the scenarios with a lot of messages (eg. 100000 messages) that trigger a scale out and rebalance I still have the duplicate message issue.

monteiro-alan commented 1 year ago

@shrohilla tried with:

{
  "version": "2.0",
  "extensions": {
    "kafka": {
      "maxBatchSize": 100,
      "ExecutorChannelCapacity": 1
    }
  },
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  }
}

Result: Screen Shot 2022-12-14 at 11 58 47

shrohilla commented 1 year ago

@monteiro-alan that you need to tune based on your number of events and there payload size moreover since you are performing some json payload work so you need to tune your trigger params like AutoCommitIntervalMs. You may refer to our documentation for that.

As per your screenshot, I might say that you need to increase the maxBatchSize parameter to atleast 500 and ExecutorChannelCapacity to min 50 events for caching.

monteiro-alan commented 1 year ago

@shrohilla I tried with maxBatchSize 500, ExecutorChannelCapacity 50 and not setting the AutoCommitIntervalMs, but the result was worse than the previous ones. Screen Shot 2022-12-14 at 13 03 26

I'm testing 100000 messages with 100kb size using the Kafka console performance test.

shrohilla commented 1 year ago

@monteiro-alan looks like you need to tune the params for that you need to check the documentation.

monteiro-alan commented 1 year ago

@shrohilla I didn't find the recommendations in the documentation, only the params descriptions. Where I can find this?

monteiro-alan commented 1 year ago

@shrohilla I think this error is related with the errors that are showing in Application Insights. They are offset store errors, It seams that sometimes the function have some kind of problem to store the offsets, so the message is processed more than one time: image

Confluent.Kafka.KafkaException:
   at Confluent.Kafka.Impl.SafeKafkaHandle.StoreOffsets (Confluent.Kafka, Version=1.9.0.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e)
   at Confluent.Kafka.Consumer`2.StoreOffset (Confluent.Kafka, Version=1.9.0.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e)
   at Microsoft.Azure.WebJobs.Extensions.Kafka.AsyncCommitStrategy`2.Commit (Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.7.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35: /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AsyncCommitStrategy.cs:28)
   at Microsoft.Azure.WebJobs.Extensions.Kafka.FunctionExecutorBase`2.Commit (Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.7.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35: /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs:87)
monteiro-alan commented 1 year ago

@shrohilla I tried with another Kafka cluster to make sure it wasn't a Kafka problem and the problem continued.

I noticed that when some messages are consumed more than once, the Kafka errors that I already mentioned appear. When consumed only once, no errors are generated in Application Insights.

shrohilla commented 1 year ago

@monteiro-alan you need to tune the params for your application like I mentioned AutoCommitIntervalMs or maybe your output time might be time consuming so you might need to update the LingerMs.

Can you share the sample payload?

monteiro-alan commented 1 year ago

@shrohilla I'm using the kafka-producer-perf-test.sh that comes with Kafka to test performance. I'm generating with the command:

/kafka-producer-perf-test.sh --topic test-topic --num-records 30000 --record-size 50000 --throughput -1 --producer-props acks=all bootstrap.servers=#BOOTSTRAP_SERVER#

I've tried with several configurations, but without success. When I disable the "Runtime Scale Monitoring" (which is necessary to scale the application) and the function does not scale, the problem of processing messages more than once does not occur. image

kWozniak-tt commented 1 year ago

@monteiro-alan i have the same issue. Did you manage to solve the problem?

gastonmuijtjens commented 8 months ago

@shrohilla

I am using a Kafka trigger with an Azure Event Hub on .NET isolated functions (Microsoft.Azure.Functions.Worker.Extensions.Kafka 3.10.1) and I am facing the exact same issue. Enabling Runtime Scale Monitoring causes the following exception (one for almost every message):

Commit error: Local: Erroneous state    

Confluent.Kafka.KafkaException:
   at Confluent.Kafka.Impl.SafeKafkaHandle.StoreOffsets (Confluent.Kafka, Version=1.9.0.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e)
   at Confluent.Kafka.Consumer`2.StoreOffset (Confluent.Kafka, Version=1.9.0.0, Culture=neutral, PublicKeyToken=12c514ca49093d1e)
   at Microsoft.Azure.WebJobs.Extensions.Kafka.AsyncCommitStrategy`2.Commit (Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.9.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35: /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/AsyncCommitStrategy.cs:28)
   at Microsoft.Azure.WebJobs.Extensions.Kafka.FunctionExecutorBase`2.Commit (Microsoft.Azure.WebJobs.Extensions.Kafka, Version=3.9.0.0, Culture=neutral, PublicKeyToken=31bf3856ad364e35: /mnt/vss/_work/1/s/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/FunctionExecutorBase.cs:87)

Disabling Runtime Scale Monitoring does not cause any exceptions, but when sending over >100.000 messages to the Event Hub they are not processed in time since the Azure Function does not scale automatically.

@monteiro-alan Did you already find a solution?

EDIT: