Azure / azure-event-hubs-node

Node client library for Azure Event Hubs https://azure.microsoft.com/services/event-hubs
MIT License
50 stars 46 forks source link

consumer partition assignment and rebalancing #93

Closed davidwmartines closed 6 years ago

davidwmartines commented 6 years ago

Is your feature request related to a problem? Please describe. Need to be able change number of concurrent consumer instances in a consumer group in oder to increase/decrease throughput simply by starting or stopping instances without changing configuration.

Describe the solution you'd like For example a hub with 2 partitions. Starting one EPH process, it will consume from partitions 0 and 1. Starting additional EPH process (in same consumer group), first EPH now only consumes from partition 0 and new EPH consumes from partition 1. Stopping second EPH instance, causes first EPH instance to consume from partitions 0 and 1 again.

Describe alternatives you've considered

davidwmartines commented 6 years ago

After some more reading, I found this functionality should already be handled by EPH. It is not working for me, so I assumed it was not implemented, but looking at the debug log I see the new EPH instance is not able to acquire lease:

azure:event-hubs:eph:lease-manager   name: 'StorageError',
  azure:event-hubs:eph:lease-manager   message: 'There is already a lease present.\nRequestId:09513e23-201e-013c-6e25-1aab8d000000\nTime:2018-07-12T21:17:20.8290412Z',
  azure:event-hubs:eph:lease-manager   code: 'LeaseAlreadyPresent',
  azure:event-hubs:eph:lease-manager   statusCode: 409,
  azure:event-hubs:eph:lease-manager   requestId: '09513e23-201e-013c-6e25-1aab8d000000' }. Will retry. +190ms

This occurs repeatedly until the process exits with

Error: The connection was inactive for more than the allowed 300000 milliseconds and is closed by container 'LinkTracker'. TrackingId:d441724be67b449884a7ab8425d6e3f2_G0, SystemTracker:gateway5, Timestamp:7/12/2018 9:19:35 PM
amarzavery commented 6 years ago

@davidwmartines - Yes you are right. The EPH is aggressive in acquiring the lease and maintaining it. It will keep on renewing the lease at 80% of the lease duration. Thereby starving other EPH instances. For some reason (network issues, or external factors), if it fails to renew the lease then other eph instances get the lease.

I do not like this design. A better approach would be to use Epoch receiver. Eventhubs, will always honor a receiver with a higher epoch value within a consumer group. It will disconnect the current receiver if

An EPH will:

  1. Get all the partitionIds from the EventHub.
  2. Create a blob container and storage blob if they do not exist
  3. Get the blob content from the provided blobs. (This will be repeated at a constant rate (say every ~3 minutes)).
  4. Analyze the blob content and find partitions that can be listened (This involves ensuring that the balancing is efficient).
    1. If there are no partitions to listen (because the number of EPH's are more than number of Partitions then may be sleep and try again or may be throw an error "It is not worth trying at the moment due to blah blah reason").
  5. For the list of partitions that can be taken:
    1. Create an epoch receiver with a higher value than the current receiver. If this fails no worries (may be another EPH started receiving before we could)
    2. Acquire the blob lease for that partition (keep renewing the lease at around 80% of the current expiration time). An (already acquired by someone) error should not happen at this point. If it does, we need to think..
    3. Keep checkpointing at regular intervals on behalf of the customer to ensure that the delta is not huge.

Whenever a receiver gets disconnected for any reason:

  1. It will release the lease immediately.

Balancing partitions: This is the trickiest part and I am still thinking about it.

davidwmartines commented 6 years ago

@amarzavery Thanks for all your great work on this library. Any idea on the timeline for getting rebalancing working?

amarzavery commented 6 years ago

Hi @davidwmartines - Thanks for asking. It is on my list. I am currently testing the event hub client by adding some long running tests (running it for more than 72 hours). I found some bugs in the way I am handling disconnect errors and am fixing them. I plan to start working on rebalancing from today or Monday at the earliest. Implementing rebalancing should not take more than a week. I can provide you some early bits by next Friday July 27. Should that work?

davidwmartines commented 6 years ago

@amarzavery Great! Thank you.

amarzavery commented 6 years ago

@davidwmartines - I have a PR #134 out that fixes this issue. I am doing some more testing of this feature. In the meanwhile please feel free to

git clone https://github.com/amarzavery/azure-event-hubs-node.git
cd processor
npm i

and use the package locally.

Sorry for the delay in the promised deadline.

amarzavery commented 6 years ago

You can find the multi eph sample over here

amarzavery commented 6 years ago

@davidwmartines - Please install the @azure/event-processor-host from npm. That is the stable version of EPH.