minghuaw / azeventhubs

Unofficial Azure Event Hubs SDK over AMQP 1.0 for rust
6 stars 3 forks source link

Implement `EventHubProcessor` #1

Open minghuaw opened 1 year ago

minghuaw commented 1 year ago

tokio::task::JoinSet might be useful

minghuaw commented 9 months ago

A short guide

A short guide is provided below for folks that are interested in working on this issue.

The Event Processor client library is a companion to the Azure Event Hubs client library, providing a stand-alone client for consuming events in a robust, durable, and scalable way that is suitable for the majority of production scenarios.

https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventhubs.processor-readme?view=azure-dotnet

Related resources

  1. Azure Event Hubs Event Processor client library for .NET (https://github.com/Azure/azure-sdk-for-net/tree/main/sdk/eventhub/Azure.Messaging.EventHubs.Processor)
  2. Azure Storage Blobs (https://crates.io/crates/azure_storage_blobs)
  3. Azure Identity (https://crates.io/crates/azure_identity)
  4. tokio (https://docs.rs/tokio/latest/tokio/index.html)

Event Processor

The processor client should

... manage the responsibilities associated with connecting to a given Event Hub and processing events from each of its partitions, in the context of a specific consumer group. The act of processing events read from the partition and handling any errors that occur is delegated by the event processor to code that you provide ...

... Checkpointing is the responsibility of the consumer and occurs on a per-partition, typically in the context of a specific consumer group. For the EventProcessorClient, this means that, for a consumer group and partition combination, the processor must keep track of its current position in the event stream ...

https://learn.microsoft.com/en-us/dotnet/api/overview/azure/messaging.eventhubs.processor-readme?view=azure-dotnet#key-concepts

Let's break this apart.

1. Connecting to Azure Event Hubs

Connecting to Azure Event Hubs for creating a consumer with either a connection string or some other supported Azure Identity is already taken care of by EventHubConnection (https://docs.rs/azeventhubs/latest/azeventhubs/struct.EventHubConnection.html or https://github.com/minghuaw/azeventhubs/blob/main/src/event_hubs_connection.rs), which simply wraps an AmqpClient (https://github.com/minghuaw/azeventhubs/blob/main/src/amqp/amqp_client.rs).

Because the dotnet SDK uses Azure Storage Blobs as the durable data store for checkpointing, the azure_storage_blobs crate (https://docs.rs/azure_storage_blobs/latest/azure_storage_blobs/) could be used.

2. Processing events from the partitions

Without considering load balancing across multiple processor clients running on separate processes or machines, the processor client should allow user add an event processing function (probably an async function). Something similar to below

pub fn process_event_handler<F, Fut>(f: F)
where
    F: Fn(ProcessEventArgs) -> Fut,
    Fut: std::future::Future<Output = Result<(), ProcessorError>>
{
    todo!()
}

ProcessEventArgs is a new type that should allow user to get a reference to the underlying message (https://docs.rs/azeventhubs/latest/azeventhubs/struct.ReceivedEventData.html or https://github.com/minghuaw/azeventhubs/blob/46a4c32b19445f247247f53e3443065871bc2c66/src/event_data.rs#L100)

The error type ProcessorError could be limited to simply azure_core::Error or make it generic type so that user can choose their own error type. But this error type might be required to be able to convert into ProcessErrorEventArgs (see below)

3. Handling errors

The processor client should then allow user to add a handler to process errors (whether this should be async is probably debatable)

pub fn process_error_handler<F, Fut>(f: F)
where
    F: Fn(ProcessErrorEventArgs) -> Fut,
    Fut: std::future::Future<Output = Result<(), ProcessorErrorEventError>>
{
    todo!()
}

ProcessErrorEventArgs is a new type that contains the error returned from the event handler. Errors with the underlying connection that is not fixed by retrying should probably be returned here as well, which is an azure_core::Error. The returned result may be used to indicate whether to stop the processor if an error cannot be handled (debatable)?

4. Start processing

Let's not consider load balancing across multiple clients yet. Start processing is probably just first obtaining a list of partitions and then spawning a tokio task for each partition. A tokio::task::JoinSet is probably useful here so that we don't need to keep track of each task handle manually.

5. Checkpointing

Each event processing task should periodically create checkpoints with Azure Storage Blobs (https://docs.rs/azure_storage_blobs/latest/azure_storage_blobs/) or maybe other kind of persistent storage (abstracting this into a public trait). This may also have further implications for consumer auto recovery, which is something to consider later.

Other things to think about

  1. How checkpointing is used for load balancing across multiple processor clients? Does a link get stolen if load balancing happens? Then we will need to deal with this issue (#35 )
  2. How should auto recovery behave if Azure Storage Blobs is used for checkpointing?
minghuaw commented 9 months ago

This could be implemented as a separate crate. It might be worth to put everything related to AMQP in azeventhubs to a separate shared crate