elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.17k stars 3.49k forks source link

[Meta][Azure Event Hub] Client Update Initial Exploration #14613

Open andsel opened 1 year ago

andsel commented 1 year ago

Problem

Logstash Azure EventHub input plugin is implemented using a pretty old version of the Azure client library (com.microsoft.azure:azure-eventhubs:2.3.2 released on 6 May 2019). The Azure client was rationalized and was created Azure-sdk-for-java, creating the library azure-messaging-eventhubs bumping the version to 5.x (current latest is 5.13.1 released 20 Sep 2022). This change is a complete rewrite and the API are not compatible, packages were moved and classes renamed or dropped. Using an old library carries problem of lack of compatibility and some subtle errors in the plugin that uses it.

Goals

The goal is adapt the plugin to use the latest azure-messaging-eventhubs client library.

Design

To have a code that could be easily updated to future versions of the Azure client library a rewrite of most of the code in Java is required. The Java code base should be surrounded by Ruby shim to act as a Ruby plugin, with the same spirit used in the Beats input plugin.

Changes at library level

private static void onEvent(EventContext eventContext) { EventData event = eventContext.getEventData(); // process the data }


- The plugin offered the `initial_position` to consume the event hub from various position: `beginning`, `end` or a custom defined `look_back` of `initial_position_look_back` seconds. This leveraged the `EventProcessorOptions.setInitialPositionProvider(positionProviderImpl)` method. Now this feature is exposed through `EventProcessorClientBuilder.initialPartitionEventPosition(Map)` which sets the position for each partition. This wasn't something the plugin done in the past. In this case the plugin has to know all of its partitions before set the `initial_position`.
- For all the asychronous calls the previous version of the library leveraged the `CompletableFuture` and now switched to reactive streams's `Flux` and `Mono`. In the old version all the methods that returned an asychronous result returned instances of `CompletableFuture`, that permitted the asychronous functional composition through the usage of `when_complete`, `then_compose` `then_compose`. Now all the APIs that return asychronous results switched to reactive stream protocol and plugins code has to be adapted to that.

## Java rewrite of the plugin

The plugin uses a pure Java API, orchestrating the creation of all the classes needed to build the EventHub client and processor.
For better developer experience, should be better to have all the code in Java, expose with just one class to the plugin, and define thin shim of Ruby code to drive the creation of the Java's plugin class. This would improve the readability and maybe could improve performance because the bridge between Ruby and Java is crossed just one time per event (on the enqueue) and not many times as it does now, having the majority of the client code in Ruby.

## Foreseeable issues
- all the configuration option exposed by the plugin are directly mapped down to the client library. It could be that some of those aren't anymore valid in the new version of the client library.

## Implementation plan
- [ ] move basic functionalities with required settings into the new library
- [ ] implement the  "advanced" configuration
- [ ] implement all the other settings deprecating and providing a move forward plan to those not yet available

----

 For reference:
The following table summarizes what should be the biggest changes between the 2 libraries.
|             |       v2     |  v5  |
|-----------|-------------|-------|
|main library| com.microsoft.azure:azure-eventhubs | com.azure:azure-messaging-eventhubs
| azure Event Processer Host (EPH)| com.microsoft.azure:azure-eventhubs-eph | **deprecated** all the high level functions are present in [EventProcessorClient](https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/latest/com/azure/messaging/eventhubs/EventProcessorClient.html) built using [EventProcessorClientBuilder](https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/latest/com/azure/messaging/eventhubs/EventProcessorClientBuilder.html) ([migration guide](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/migration-guide.md))
| checkpoint storage lib | com.microsoft.azure:azure-storage | com.azure/azure-messaging-eventhubs-checkpointstore-blob
| package rename | com.microsoft.azure.eventhubs | com.azure.messaging.eventhubs
| asynch programming model | Java's `CompletableFuture` | `Mono` and `Flux` from ReactiveStreams/ Spring Reactor
| client creation | `EventHubClient` constructor invokation | [`EventHubClientBuilder.buildConsumerClient()`](https://javadoc.io/static/com.azure/azure-messaging-eventhubs/5.13.1/com/azure/messaging/eventhubs/EventHubClientBuilder.html#buildConsumerClient())
| consuming | Implementation of callback interface [`com.microsoft.azure.eventprocessorhost.IEventProcessor`](https://javadoc.io/static/com.microsoft.azure/azure-eventhubs-eph/2.4.0/com/microsoft/azure/eventprocessorhost/EventProcessorHost.html#registerEventProcessorFactory-com.microsoft.azure.eventprocessorhost.IEventProcessorFactory-com.microsoft.azure.eventprocessorhost.EventProcessorOptions-) used by `EventProcessorHost.registerEventProcessorFactory` | callback functions set by client builder, for example [`processEvent()`](https://javadoc.io/static/com.azure/azure-messaging-eventhubs/5.13.1/com/azure/messaging/eventhubs/EventProcessorClientBuilder.html#processEvent(java.util.function.Consumer))
| stream positioning | `EventProcessorOptions.setInitialPositionProvider()` | `consumer.receiveFromPartition("0", EventPosition.latest())` check doc section ["Consume events with EventHubConsumerAsyncClient"](https://javadoc.io/static/com.azure/azure-messaging-eventhubs/5.13.1/index.html?com/azure/messaging/eventhubs/EventProcessorClientBuilder.html)
mashhurs commented 1 year ago

💯 I strongly support this improvement. We had a case where LS (EPH from Azure point of view) loses the Lease. Slight backpressure may also be a reason for loosing the Lease easily. Additionally, lib has some bugs (eg.: link) which will not be solved it we don't upgrade.