redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 817 forks source link

read_until intermittently fails to ACK the last message due to canceled context #2250

Open eduardodbr opened 10 months ago

eduardodbr commented 10 months ago

It seems that sometimes the context is being canceled before the last message is ACKed in the child input of read_until.

Steps to reproduce:

  1. Run azurite locally: docker run -p 10000:10000 -p 10001:10001 -p 10002:10002 mcr.microsoft.com/azure-storage/azurite:3.27.0
  2. Create the queue in azurite: az storage queue create --name test --account-name test --connection-string "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"
  3. Produce a message: az storage message put -q test --content '{"foo":"bar"}' --account-name test --connection-string "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"
  4. Run benthos with the following pipeline:
input:
  read_until:
    input:
      azure_queue_storage:
        storage_account: "test"
        queue_name: "test"
        dequeue_visibility_timeout: 5s
        storage_connection_string: "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;QueueEndpoint=http://127.0.0.1:10001/devstoreaccount1;TableEndpoint=http://127.0.0.1:10002/devstoreaccount1;"
    check: 'this.foo == "bar"'
    restart_input: false

while doing this test multiple times, sometimes it ACKs the message and others the context is cancelled:

logs when fails:

INFO Running main config from specified file       @service=benthos benthos_version=4.23.0 path=pipeline.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
{"foo":"bar"}
ERRO Failed to acknowledge message: error deleting message: context canceled  @service=benthos label="" path=root.input.read_until.input
INFO Pipeline has terminated. Shutting down the service  @service=benthos

logs when ACKs the message with success:

INFO Running main config from specified file       @service=benthos benthos_version=4.23.0 path=pipeline.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
{"foo":"bar"}
INFO Pipeline has terminated. Shutting down the service  @service=benthos

This was also verified when testing with AWS SQS input with localstack:

input:
  read_until:
    input:
      aws_sqs:
        url: "http://localhost:4566/000000000000/test" 
        endpoint: "http://localhost:4566"
        region: us-east-1
        credentials:
          profile: "localstack"
    check: 'this.foo == "bar"'
    restart_input: false
INFO Running main config from specified file       @service=benthos benthos_version=4.23.0 path=pipeline.yaml
INFO Listening for HTTP requests at: http://0.0.0.0:4195  @service=benthos
INFO Receiving Amazon SQS messages from URL: http://localhost:4566/000000000000/test  @service=benthos label="" path=root.input.read_until.input
INFO Launching a benthos instance, use CTRL+C to close  @service=benthos
{"foo":"bar"}
ERRO Failed to acknowledge message: context canceled  @service=benthos label="" path=root.input.read_until.input
INFO Pipeline has terminated. Shutting down the service  @service=benthos
AdilMektoub commented 5 months ago

Hi @eduardodbr Thank you for your issues, I have the same problem in EKS with benthos, "Pipeline has terminated. Shutting down the service" The problem is when Benthos not receive in input request or process automatically the service Benthos shutting down but if you want fix just add in your configMap after output: "shutdown_delay: 3600s" or more

AnithaG-Oak commented 1 month ago

Hello @Jeffail,

We have been seen similar issue sporadically ever since we upgraded benthos from 4.14 to 4.27. Benthos stops reading messages from SQS and we could see this error in the logs:

level=error msg="Failed to acknowledge message: context canceled" @service=benthos label="" path=root.input stream=ledger_balance_exported

The issue occurs in the stream that has high message load and doesn't occur all the time. Any help on this is appreciated.

Thanks, Anitha