Azure / azure-sdk-for-go

This repository is for active development of the Azure SDK for Go. For consumers of the SDK we recommend visiting our public developer docs at:
https://docs.microsoft.com/azure/developer/go/
MIT License
1.64k stars 841 forks source link

Event Hubs: azure-sdk-for-go is not working with azure connection string #23379

Closed UpadhyayAbhishek closed 4 weeks ago

UpadhyayAbhishek commented 2 months ago

We are using the code available here but it is not working properly, logs came sporadically. Kindly suggest us on this

github-actions[bot] commented 2 months ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

jhendrixMSFT commented 2 months ago

Can you please provide additional info on what's not working? Code samples might help too if possible.

github-actions[bot] commented 2 months ago

Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

UpadhyayAbhishek commented 2 months ago

Can you please provide additional info on what's not working? Code samples might help too if possible.

Hi Team,

Thanks for your reply, below is the code which is not working. We have also replaced the working eventhub credentials and storage credentials and rechecked with them as well. We took the reference from the official document

package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
    "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)

    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}
richardpark-msft commented 2 months ago

@UpadhyayAbhishek, that code looks fine. Can you share the actual error messages you're seeing?

github-actions[bot] commented 2 months ago

Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

UpadhyayAbhishek commented 1 month ago

Can we get any update on this.

jhendrixMSFT commented 1 month ago

@UpadhyayAbhishek we're waiting for more info from you. Specifically, can you please provide error messages etc that you're observing. From the code sample you provided, nothing looks incorrect. Also, if you can include the module version(s) you're using that would help too.

github-actions[bot] commented 1 month ago

Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

UpadhyayAbhishek commented 1 month ago

@UpadhyayAbhishek we're waiting for more info from you. Specifically, can you please provide error messages etc that you're observing. From the code sample you provided, nothing looks incorrect. Also, if you can include the module version(s) you're using that would help too.

We are not getting any error, thought we are not able to pull azure eventhub and when we were using previous eventprocessorhost library then we are getting data from Azure eventhub

jhendrixMSFT commented 1 month ago

If I understand correctly, you're not receiving any events with the above code, is that correct? What versions of azeventhubs and azblob are you using? Also, what's the previous event processor you were using?

github-actions[bot] commented 1 month ago

Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

richardpark-msft commented 1 month ago

@UpadhyayAbhishek, based off what you mention you might want to tune this bit of code:

receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute) events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)

The way it's written now it'll return after 1 minute or 100 events arrive. If you have fewer than 100 events in a minute your code will end up waiting for a full minute. Try tuning that down to a smaller interval (say, 5 * time.Second), or a lower number of messages (or both).

github-actions[bot] commented 1 month ago

Hi @UpadhyayAbhishek, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

UpadhyayAbhishek commented 1 month ago

Hi @UpadhyayAbhishek, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), 5*time.Second)
    events, err := partitionClient.ReceiveEvents(receiveCtx, 10, nil)

Tried with suggested code but still getting the same error.

Error

panic: dial tcp 172.191.248.146:5671: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.

goroutine 1 [running]: main.main() C:/Users/abhishek.upadhyay2/Desktop/Code/latestEHB/main.go:66 +0x1f8 exit status 2

UpadhyayAbhishek commented 1 month ago

Can we have any updates on this.

jhendrixMSFT commented 1 month ago

It appears that the client couldn't connect to the service. Can you please enable logging and provide the full logs? You can find instructions here.

github-actions[bot] commented 1 month ago

Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

UpadhyayAbhishek commented 4 weeks ago

@jhendrixMSFT Thanks for the response, we have enabled the logs and below are the logs we got.

[Event: Response] ==> REQUEST/RESPONSE (Try=1/59.0211ms, OpTime=59.6602ms) -- RESPONSE RECEIVED
   PUT https://beatsautomatioinstorage.blob.core.windows.net/prod101/beatsautomationspace1.servicebus.windows.net%2Finsights-logs-operationallogs%2F$Default%2Fownership%2F1?comp=REDACTED
   Accept: application/xml
   Authorization: REDACTED
   If-Match: 0x8DCE76B7B0BD751
   User-Agent: azsdk-go-azblob/v1.4.0 (go1.22.4; Windows_NT)
   X-Ms-Date: REDACTED
   x-ms-meta-ownerid: REDACTED
   x-ms-version: REDACTED
   --------------------------------------------------------------------------------
   RESPONSE Status: 200 OK
   Content-Length: 0
   Date: Tue, 08 Oct 2024 07:33:27 GMT
   Etag: "0x8DCE76B804CA38C"
   Last-Modified: Tue, 08 Oct 2024 07:33:27 GMT
   Server: Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0
   X-Ms-Request-Id: 30775d01-401e-0039-3554-19d0c3000000
   X-Ms-Request-Server-Encrypted: REDACTED
   X-Ms-Version: REDACTED

[Event: Request] ==> OUTGOING REQUEST (Try=1)
   PUT https://beatsautomatioinstorage.blob.core.windows.net/prod101/beatsautomationspace1.servicebus.windows.net%2Finsights-logs-operationallogs%2F$Default%2Fownership%2F2?comp=REDACTED
   Accept: application/xml
   Authorization: REDACTED
   If-Match: 0x8DCE76B7B156978
   User-Agent: azsdk-go-azblob/v1.4.0 (go1.22.4; Windows_NT)
   X-Ms-Date: REDACTED
   x-ms-meta-ownerid: REDACTED
   x-ms-version: REDACTED

Kindly share your thoughts on this.

jhendrixMSFT commented 4 weeks ago

This is all the logs? I would have thought there to be more. It's after this last request to blob storage that you see the connection error?

github-actions[bot] commented 4 weeks ago

Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.

UpadhyayAbhishek commented 4 weeks ago

This is all the logs? I would have thought there to be more. It's after this last request to blob storage that you see the connection error?

Yes, these are the logs which are coming again, it seems this library is not updating the checkpoint since it is not pulling existing data in azure event hub.

richardpark-msft commented 4 weeks ago

Hi @UpadhyayAbhishek, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!

receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), 5*time.Second)
  events, err := partitionClient.ReceiveEvents(receiveCtx, 10, nil)

Tried with suggested code but still getting the same error.

Error

panic: dial tcp 172.191.248.146:5671: connectex: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond.

goroutine 1 [running]: main.main() C:/Users/abhishek.upadhyay2/Desktop/Code/latestEHB/main.go:66 +0x1f8 exit status 2

This indicates you have network connectivity issues, which would explain why you're receiving events sporadically - we retry on network failures and those introduce delays.

This looks more like an infrastructure issue, than an SDK issue. I'm going to close this, but please reopen if you resolve your connectivity issues and get further errors.