Open UpadhyayAbhishek opened 2 weeks ago
Thank you for your feedback. Tagging and routing to the team member best able to assist.
Some things that stand out:
This code will always take you to latest, so any spot you have this you need to change it.
// NOTE: from your issue, this one definitely will not work to bring you to earliest.
processor, err := azeventhubs.NewProcessor(client, checkpointBlobStore, &azeventhubs.ProcessorOptions{
LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy,
StartPositions: azeventhubs.StartPositions{
Default: azeventhubs.StartPosition{
Offset: nil, // Start from the beginning of the event stream
},
},
})
If you really do want earliest, there is StartPosition.Earliest (link), which you can set to to.Ptr(true)
.
processor, err := azeventhubs.NewProcessor(client, checkpointBlobStore, &azeventhubs.ProcessorOptions{
LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy,
StartPositions: azeventhubs.StartPositions{
Default: azeventhubs.StartPosition{
Earliest: to.Ptr(true),
},
},
})
We're going to need your verbose Azure SDK logs again. I am looking for all the lines that looks like this in your output:
[Oct 21 12:09:47.948173] azeh.Consumer: Creating receiver:
source:aatest/ConsumerGroups/$Default/Partitions/0
instanceID: dd3dd856-840a-4bad-6d12-79f9dfdcf7c7
owner level: 824638397928
offset: amqp.annotation.x-opt-enqueued-time > '0'
manual: false
prefetch: 0
This'll tell me what offset ended up getting used for your individual receivers, and to which partitions. We can trace that backwards to make sure that everything lines up the way we're expecting.
I forgot to include this: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/README.md#logging
This will show you how to enable logging.
Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.
@richardpark-msft func deleteCheckpointContainer(storageConnStr, containerName string) error { blobClient, err := azblob.NewClientFromConnectionString(storageConnStr, nil) if err != nil { return err } this function is deleting the check point and it is able to pull the previous existing data but when we are restarting the code then again it is deleting the checkpoint and again it is pulling the data from starting, while code should pull the existing data only once and it should not pull it again ever if we are restarting the code.
Kindly share your thoughts on this.
@richardpark-msft func deleteCheckpointContainer(storageConnStr, containerName string) error { blobClient, err := azblob.NewClientFromConnectionString(storageConnStr, nil) if err != nil { return err } this function is deleting the check point and it is able to pull the previous existing data but when we are restarting the code then again it is deleting the checkpoint and again it is pulling the data from starting, while code should pull the existing data only once and it should not pull it again ever if we are restarting the code.
Kindly share your thoughts on this.
Yes, understood. However, since things aren't working so there must be some extra problem occurring, which is why I need to see the logs. Instructions for that are here: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/README.md#logging
Hi @UpadhyayAbhishek. Thank you for opening this issue and giving us the opportunity to assist. To help our team better understand your issue and the details of your scenario please provide a response to the question asked above or the information requested above. This will help us more accurately address your issue.
Hi @UpadhyayAbhishek, we're sending this friendly reminder because we haven't heard back from you in 7 days. We need more information about this issue to help address it. Please be sure to give us your input. If we don't hear back from you within 14 days of this comment the issue will be automatically closed. Thank you!
Bug Report
Hi Team,
We are using below sdk for pulling data from azure eventhub. https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs
This sdk is now able to pull the live logs coming in azure event hub but it is not pulling already available logs in eventhub.
However while using previous sdk github.com/Azure/azure-event-hubs-go/eph it used to pull all available logs in event hub.
Below is the code with new sdk.
We are using multiple instances of azeventhubs.StartPosition. Please find a few code snippets below.
1) processor, err := azeventhubs.NewProcessor(client, checkpointBlobStore, &azeventhubs.ProcessorOptions{ LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy, StartPositions: azeventhubs.StartPositions{ Default: azeventhubs.StartPosition{ Offset: nil, // Start from the beginning of the event stream }, }, })
2) processor, err := azeventhubs.NewProcessor(client, checkpointBlobStore, &azeventhubs.ProcessorOptions{ LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy, StartPositions: azeventhubs.StartPositions{ Default: azeventhubs.StartPosition{ EnqueuedTime: time.Unix(0, 0), // Use a very early timestamp (epoch time) }, }, })
Kindly share your thoughts on this.