Azure / azure-event-hubs-go

Golang client library for Azure Event Hubs https://azure.microsoft.com/services/event-hubs
MIT License
88 stars 69 forks source link

Event Processor Host does not start consuming messages #276

Closed BICKELC closed 1 year ago

BICKELC commented 1 year ago

After upgrading azure-event-hubs-go to 3.3.19, it does not start consuming messages anymore on using the Event Processor Host. The following code (based on the example in Readme ):

package main

import (
    "context"
    "fmt"
    "os"
    "os/signal"
    "time"

    eventhub "github.com/Azure/azure-event-hubs-go/v3"
    "github.com/Azure/azure-event-hubs-go/v3/eph"
    "github.com/Azure/azure-event-hubs-go/v3/storage"
    "github.com/Azure/azure-storage-blob-go/azblob"
    "github.com/Azure/go-autorest/autorest/azure"
)

func main() {
    // Azure Storage account information
    storageAccountName := os.Getenv("StorageAccountName")
    fmt.Printf("storageAccountName is: %s\n", storageAccountName)
    storageAccountKey := os.Getenv("StorageAccountKey")
    fmt.Printf("storageAccountKey is: %s\n", storageAccountKey)
    // Azure Storage container to store leases and checkpoints
    storageContainerName := "ncsconsumergroup"

    // Azure Event Hub connection string
    eventHubConnStr := os.Getenv("EventhubConnectionString")
    fmt.Printf("eventHubConnStr is: %s\n", eventHubConnStr)

    // create a new Azure Storage Leaser / Checkpointer
    cred, err := azblob.NewSharedKeyCredential(storageAccountName, storageAccountKey)
    if err != nil {
        fmt.Println(err)
        return
    }

    leaserCheckpointer, err := storage.NewStorageLeaserCheckpointer(cred, storageAccountName, storageContainerName, azure.PublicCloud)
    if err != nil {
        fmt.Println(err)
        return
    }

    ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
    defer cancel()
    // create a new EPH processor
    processor, err := eph.NewFromConnectionString(ctx, eventHubConnStr, leaserCheckpointer, leaserCheckpointer)
    if err != nil {
        fmt.Println(err)
        return
    }

    // register a message handler -- many can be registered
    handlerID, err := processor.RegisterHandler(ctx,
        func(c context.Context, e *eventhub.Event) error {
            fmt.Println(string(e.Data))
            return nil
        })
    if err != nil {
        fmt.Println(err)
        return
    }

    fmt.Printf("handler id: %q is running\n", handlerID)

    // unregister a handler to stop that handler from receiving events
    // processor.UnregisterHandler(ctx, handleID)

    // start handling messages from all of the partitions balancing across multiple consumers
    err = processor.StartNonBlocking(ctx)
    if err != nil {
        fmt.Println("ERROR on StartNonBlocking")
        fmt.Println(err)
        return
    }

    // Wait for a signal to quit:
    signalChan := make(chan os.Signal, 1)
    signal.Notify(signalChan, os.Interrupt, os.Kill)
    <-signalChan

    err = processor.Close(context.Background())
    if err != nil {
        fmt.Println(err)
        return
    }
}

On letting this code run the first time, there is no issue. But as soon as the entries in objectstorage already exist (at the second run), this code fails with the following output:

storageAccountName is: XXX
storageAccountKey is: XXX
eventHubConnStr is: XXX
handler id: "a39d161d-341f-47c9-8414-66fb384ee9e4" is running

    ______                 __  __  __      __
   / ____/   _____  ____  / /_/ / / /_  __/ /_  _____
  / __/ | | / / _ \/ __ \/ __/ /_/ / / / / __ \/ ___/
 / /___ | |/ /  __/ / / / /_/ __  / /_/ / /_/ (__  )
/_____/ |___/\___/_/ /_/\__/_/ /_/\__,_/_.___/____/

ERROR on StartNonBlocking
-> github.com/Azure/azure-event-hubs-go/v3/internal/azure-storage-blob-go/azblob.newStorageError, /go/src/XXX/vendor/github.com/Azure/azure-event-hubs-go/v3/internal/azure-storage-blob-go/azblob/zc_storage_error.go:43
===== RESPONSE ERROR (ServiceCode=BlobAlreadyExists) =====
Description=The specified blob already exists.
RequestId:2f400cd9-901e-0031-724a-e988ee000000
Time:2022-10-26T14:50:50.1040632Z, Details:
   Code: BlobAlreadyExists
   PUT https://XXX.blob.core.windows.net/ncsconsumergroup/0?timeout=20
   Authorization: REDACTED
   Content-Length: [80]
   If-None-Match: [*]
   User-Agent: [Azure-Storage/0.15 (go1.19.2; linux)]
   X-Ms-Blob-Cache-Control: []
   X-Ms-Blob-Content-Disposition: []
   X-Ms-Blob-Content-Encoding: []
   X-Ms-Blob-Content-Language: []
   X-Ms-Blob-Content-Type: []
   X-Ms-Blob-Type: [BlockBlob]
   X-Ms-Client-Request-Id: [2f349a3d-b8d5-470d-57c2-69ce87bf0625]
   X-Ms-Date: [Wed, 26 Oct 2022 14:50:50 GMT]
   X-Ms-Version: [2018-11-09]
   --------------------------------------------------------------------------------
   RESPONSE Status: 409 The specified blob already exists.
   Content-Length: [220]
   Content-Type: [application/xml]
   Date: [Wed, 26 Oct 2022 14:50:49 GMT]
   Server: [Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0]
   X-Ms-Error-Code: [BlobAlreadyExists]
   X-Ms-Request-Id: [2f400cd9-901e-0031-724a-e988ee000000]
   X-Ms-Version: [2018-11-09]

(sensitive values have been replaced with XXX by me, but the output was as expected)

The object-storage we are using has the kind Storage (general purpose v1). But there are the same issues with StorageV2.

The code above is running as pod on AKS.

On analysing the issue, I realised, that version 3.3.18 of azure-event-hubs-go had the same issue, but this error during setup has been ignored: (https://github.com/Azure/azure-event-hubs-go/commit/21382524b6756f318784596600095b52489714b1#diff-e676c998b671657559774d8c366741fbcad16810aa28025ba87b945718d67378L414)

Is there something wrong with my code above?

Expected Behavior

Event Processor Host has no issues on startup.

Actual Behavior

Event Processor Host is not starting to consume messages with the error "BlobAlreadyExists".

Environment

richardpark-msft commented 1 year ago

Hi @BICKELC, I'll take a look. We did a recent change in 3.3.19 where we made it possible to use newer versions of the azblob dependency.

Do you know which version you're referencing in your go.mod?

BICKELC commented 1 year ago

Hi @richardpark-msft, thanks a lot for the fast reply.

Information about the example above with the new eventhub-lib

The azblob-dependency of the example above had the version github.com/Azure/azure-storage-blob-go v0.15.0.

Regarding my analysis the error comes from here: https://github.com/Azure/azure-event-hubs-go/blob/adc9788f66c015a26eebdc5c961efcf0b6d90cfc/eph/eph.go#L414

Additional analysis with old version of eventhub-lib

While I was analysing the issue, I also switched back the versions in go.mod file:

    github.com/Azure/azure-event-hubs-go/v3 => github.com/Azure/azure-event-hubs-go/v3 v3.3.18
    github.com/Azure/azure-storage-blob-go => github.com/Azure/azure-storage-blob-go v0.6.0

in addition I changed the old setup method (in version 3.3.18) the following:

diff --git a/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go b/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go
index fda6965..39f7178 100644
--- a/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go
+++ b/vendor/github.com/Azure/azure-event-hubs-go/v3/eph/eph.go
@@ -411,7 +411,9 @@ func (h *EventProcessorHost) setup(ctx context.Context) error {
                scheduler := newScheduler(h)

                for _, partitionID := range h.partitionIDs {
-                       h.leaser.EnsureLease(ctx, partitionID)
+                       if _, err := h.leaser.EnsureLease(ctx, partitionID); err != nil {
+                               fmt.Println(err)
+                       }
                        h.checkpointer.EnsureCheckpoint(ctx, partitionID)
                }

Interestingly there I received exactly the same error (for each partition). But the old version had no issues to start as the error was ignored:

-> github.com/Azure/azure-storage-blob-go/azblob.newStorageError, /go/src/XXX/vendor/github.com/Azure/azure-storage-blob-go/azblob/zc_storage_error.go:42
===== RESPONSE ERROR (ServiceCode=BlobAlreadyExists) =====
Description=The specified blob already exists.
RequestId:e1f235cd-a01e-0053-01d5-e9123d000000
Time:2022-10-27T07:24:41.2518645Z, Details:
   Code: BlobAlreadyExists
   PUT https://XXX.blob.core.windows.net/ncsconsumergroup/0?timeout=61
   Authorization: REDACTED
   Content-Length: [80]
   If-None-Match: [*]
   User-Agent: [Azure-Storage/0.6 (go1.19.2; linux)]
   X-Ms-Blob-Cache-Control: []
   X-Ms-Blob-Content-Disposition: []
   X-Ms-Blob-Content-Encoding: []
   X-Ms-Blob-Content-Language: []
   X-Ms-Blob-Content-Type: []
   X-Ms-Blob-Type: [BlockBlob]
   X-Ms-Client-Request-Id: [1f8f5663-66c8-4ec1-5bb1-28d2a0936650]
   X-Ms-Date: [Thu, 27 Oct 2022 07:24:41 GMT]
   X-Ms-Version: [2018-11-09]
   --------------------------------------------------------------------------------
   RESPONSE Status: 409 The specified blob already exists.
   Content-Length: [220]
   Content-Type: [application/xml]
   Date: [Thu, 27 Oct 2022 07:24:40 GMT]
   Server: [Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0]
   X-Ms-Error-Code: [BlobAlreadyExists]
   X-Ms-Request-Id: [e1f235cd-a01e-0053-01d5-e9123d000000]
   X-Ms-Version: [2018-11-09]
richardpark-msft commented 1 year ago

I'm adding in a test now, but I think we're missing handling "409 conflict" as a valid return status. It's an area I'm not super familiar with, so bear with me. :)

BICKELC commented 1 year ago

Thanks a lot for fixing this issue :)

richardpark-msft commented 1 year ago

Just tagged an official release as well: https://github.com/Azure/azure-event-hubs-go/releases/tag/v3.3.20

Thank you for reporting the issue, it really helps us out.