Azure / azure-event-hubs-go

Golang client library for Azure Event Hubs https://azure.microsoft.com/services/event-hubs
MIT License
90 stars 69 forks source link

Getting Checkpoint Offset Information #221

Open mplachter opened 3 years ago

mplachter commented 3 years ago

Expected Behavior

Hi Team.

I'm wondering if there is a API like there is in Kafka API to pull out the current Offset values of all consumers in a consumer group for a given topic vs the Current offset on the given partition of the topic for that consumer.

Trying to see if there is a way to extract data that can be used to see if there is back pressure on a certain consumer in a consumer group for a given topic or topics.

Actual Behavior

In Kafka for example you would be able to FetchOffsets using a OffsetFetchRequest to return back information regarding a given CounsumerGroup for given partitions.

https://github.com/Shopify/sarama/blob/master/broker.go#L372-L383 https://github.com/Shopify/sarama/blob/master/offset_fetch_request.go#L3-L8

Environment

devigned commented 3 years ago

I don't think this API exists for EventHubs, but I know it's been asked for in the past. Currently, this repo offers something called the Event Processor Host, which will balance consumers within a group and persist the checkpoints to Azure Storage. I think you could read those checkpoints to determine the current position of a consumer in the consumer group.

To determine the current offset for a given partition, I believe you should be able to use: https://github.com/Azure/azure-event-hubs-go/blob/0eb7b61636108a7919073ce314ca25f075c95431/hub.go#L523-L546

It will have the last enqueued offset and other related partition information. https://github.com/Azure/azure-event-hubs-go/blob/0eb7b61636108a7919073ce314ca25f075c95431/amqp_mgmt.go#L65-L74

mplachter commented 3 years ago

Thanks David.

So it seems that EventHubs don't actually keep track of the Offsets per ConsumerGroup per Partition? It seems to be offloaded to some type of CheckPointer with the popular one being the Blob Storage CheckPointer?

Interestingly enough when you use the Sarama Client over the Kafka API for EventHubs the management of the Offsets using CheckPointers goes away and this seems to be offloaded somewhere magically in the background... Which leads me to believe there should be a way to read the information out of it somehow.

devigned commented 3 years ago

Interestingly enough when you use the Sarama Client over the Kafka API for EventHubs the management of the Offsets using CheckPointers goes away and this seems to be offloaded somewhere magically in the background... Which leads me to believe there should be a way to read the information out of it somehow.

I'm 90% sure something exists for the Kafka API on EH since most clients wouldn't work without it. It's such a key feature for partitioned consumers. I'm not sure it's exposed for regular EventHubs, but I really wish it was. It would make building partitioned consumers so much easier.

Perhaps, someone from that team could comment. @jhendrixMSFT, do you know who the right person to pull in from EH would be?

jhendrixMSFT commented 3 years ago

Adding @richardpark-msft as he'll be working on EH in the future.

mplachter commented 3 years ago

So the biggest thing is when i try to get a list of Groups back from the Broker (IE Azure Event Hub URI) it returns a nil map of Groups...

Which makes me thing that the Kafka API mapping back ConsumerGroups to EventHub is not mapped correctly.

mplachter commented 3 years ago

Here is an example using Sarma

    c, err := sarama.NewClient([]string{"localhost:9092"}, getConfig())
    if err != nil {
        panic(err)
    }

    // Lets fresh metaData
    err = c.RefreshMetadata()
    if err != nil {
        panic(err)
    }

    brokers := c.Brokers()

    for _, broker := range brokers {

        // Open a connection to the broker
        err = broker.Open(getConfig())
        if err != nil {
            panic(err)
        }

        // Flow control to close connection
        defer broker.Close()

               // Listing the Groups returns a nil map of ConsumerGroups this doesn't return anything when connecting to Eevent Hub
        groups, err := broker.ListGroups(&sarama.ListGroupsRequest{})
        if err != nil {
            panic(err)
        }

       }