Azure / azure-sdk-for-net

This repository is for active development of the Azure SDK for .NET. For consumers of the SDK we recommend visiting our public developer docs at https://learn.microsoft.com/dotnet/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-net.
MIT License
5.37k stars 4.79k forks source link

[FEATURE REQ] Allow advanced customization using `EventProcessorClient` #44390

Closed shlomiassaf closed 4 months ago

shlomiassaf commented 4 months ago

Library name

Azure.Messaging.EventHubs

Please describe the feature.

The events hub EventProcessorClient is currently the only concrete event processor the SDK provides.

Similar to ServiceBus, the SDK provides the core operational functionality for working with events including thread management (especially noticed in the ServiceBus case).

Users of the SDK can extend the functionality, i.e. adding middleware support, typed handling of events, serialization, etc... These are more framework related stuff, not SDK material.

When adding customized features on top of the event processing, composition is much more powerful then inheritance (IMHO).

I've extended the ServiceBus library into a framework using composition. It is possible mainly because of the event design provided by the processor client.

For events hub, EventProcessorClient is restrictive while EventProcessor & PluggableCheckpointStoreEventProcessor does not provide the proper Azure SDK conventions.

1) EventProcessorClient implements the proper diagnostics 2) EventProcessorClient implement the proper scoping for diagnostics - i.e proper telemetry and tracing 3) EventProcessorClient implements the same design as in ServiceBus which allows common model for developers.

It will be a pain to implement a custom client from EventProcessor, and sometime nearly impossible to replicate as some things are hidden in EventProcessor, for example: ClientDiagnostics is internal in EventProcessor

The core limitation of EventProcessorClient comes from 2 points

1) The fixated use of Blob Storage for checkpoints 2) The forced single-thread processing per topic/partition/consumer-group

Now, for (2) it is clear since checkpoint implementation is not customizable you must force a single thread.

However (1), 😮 ? I wonder why BlobStorage was forced? (Promoting usage of Azure Storage maybe?)

This design decision is somewhat strange, it limits the possibilities offered by Events Hub dramatically.

What is the motivation for preventing the ability to provide a custom CheckpointStore?

With a custom CheckpointStore we can easily implement reliable consumers through transaction, I.E mongo/sql stores which allow transactional operations on both domain data and offset.

It can also allow implementation of multi-threaded handling of events using a managed checkpoint logic which handles X amount of events in parallel and commits them in order.

Other stuff also come in mind, like outboxing, usage of redis like infrastructure for super fast checkpoint store etc...

It seems odd to me as the alternative (Confluent Kafka) "checkpointing" is seamless, an integral part of the product, and as such, limiting the solution to blob is just not right.

Again, I understand there's EventProcessor which we can inherit, but it is abstraction is massive, using mostly private and internal modifiers.

Why not having EventProcessorClient accept the implementation of a CheckpointStore and also provide EventProcessorBlobStoreClient as a ready made blob solution.

Infect, it seems natural that

This is actually how most of the Azure SDK libraries are designed! To make the claim stronger, ProcessEventArgs is part of Azure.Messaging.EventHubs but never used in it !

You can also throw lot's of additional stores:

And why not even promoting FASTER with an Azure.Messaging.EventHubs.Faster implementation?

If interested I can create a PR with initial implementation that only allows customization of CheckpointStore.
In the next major version the blob implementation should get extracted to it's own library.

Thanks!

github-actions[bot] commented 4 months ago

Thank you for your feedback. Tagging and routing to the team member best able to assist.

jsquire commented 4 months ago

Hi @shlomiassaf. Thank you for reaching out and for your suggestions. The EventProcessorClient is intentionally an opinionated implementation on top of EventProcessor<T> bound to Blob Storage and we have no plans to change that.

All of the core functionality of EventProcessorClient is implemented in its lower-level ancestors, PluggableCheckpointStoreEventProcessor and EventProcessor<T>. EventProcessorClient extends this to add the .NET event handlers, unroll the batch for single dispatch, and the Blob Storage integration. There's nothing that EventProcessorClient does that is not available in the base class hierarchy - which includes the OTel diagnostics spans.

While we are open to providing additional checkpoint stores and specialized processor clients, we have not received enough developer feedback requesting them to justify the overhead of developing and maintaining them. To date, we've released beta implementations for Redis and Table Storage in some languages which have seen very low engagement.

For now, this is not something that we'll be moving forward with.

github-actions[bot] commented 4 months ago

Hi @shlomiassaf. Thank you for opening this issue and giving us the opportunity to assist. We believe that this has been addressed. If you feel that further discussion is needed, please add a comment with the text "/unresolve" to remove the "issue-addressed" label and continue the conversation.

shlomiassaf commented 4 months ago

@jsquire This is not 100% accurate

In EventProcessorClient there are things required for normal operation that should be there, it is only opinionated towards the Blob implementation.

Let's say I would want to implement my own checkpoint implementation.

I would not be able to scope the checkpoint methods...

    using var scope = ClientDiagnostics.CreateScope(DiagnosticProperty.EventProcessorCheckpointActivityName, ActivityKind.Internal);
            scope.Start();

As it's heavily guarded.

I have no complaints really, as long as i'm able to follow the standards of the SDK and frankly telemetry is important.

I do think the structure should be different, however thats arguable so your way of doing it is great. But that's just requires some access or additional wrapping to allow the core behavior to remain.

Maybe PluggableCheckpointStoreEventProcessor should have scoping wrapped internally in the checkpoint calls, maybe exposing them? either way it's missing

And don't get me wrong, I believe the external offset approach is much more diverse and powerful, as it allows massive vertical scaling, and to be honest, using Kafka API will expose seamless checkpoints anyway

shlomiassaf commented 4 months ago

/unresolve

jsquire commented 4 months ago

@shlomiassaf : That's a fair point; it does appear that we overlooked the checkpoint diagnostic scope when pulling out the PluggableCheckpointStoreEventProcessor. This may be an intentional choice because checkpointing is an internal activity type and not defined as part of the official OTel messaging spec.

One thing to note is that there's nothing special about ClientDiagnostics - it is simply an internal helper class for working with the System.Diagnostics classes. Any application can participate in activities and extend the context by using the Activity class and related elements.

For now, I'll leave this open and talk this over with our OTel expert when I'm back in the office next week and see if this is something that we should move up.

I do think the structure should be different

If it makes you feel any better, so do I. My preference would have been to have the processor client and a lower-level base type in the core library, just the checkpoint stores in external packages to manage the dependences. However, I lost that argument 4 years ago. 😄

Unfortunately, this comes down to a different vision from the Azure SDK architecture board on the hierarchy and extensibility in our .NET packages. Should we ever decide to release alternative checkpoint stores, each will come with an opinionated EventProcessorClient implementation bundled in.

shlomiassaf commented 4 months ago

@jsquire Thanks!

I appreciate the honest feedback.

While I also disagree with the board's decision, it is what it is and i'm sure there were various factors to consider.

I know that ClientDiagnostics is just an internal implementation, however to replicate it I would need to copy your code and hope nothing will change over time. Logic is, that while checkpointing might be internal, it's a feature that exist, not following it's schema will create different models in the trace data which is hell to manage.

So wether I checkpoint using Blob, SQL, mongo or in-memory, the core tracing should be the same while I can always hook in to the diagnostics and add or modify on the fly. Since all azure libraries are OTel ready just that core OTel and the OTel of the libraries will be fine.

Thanks again!

jsquire commented 4 months ago

fixed by #44486