elastic / beats

:tropical_fish: Beats - Lightweight shippers for Elasticsearch & Logstash
https://www.elastic.co/products/beats
Other
12.13k stars 4.91k forks source link

[Azure] Migrate the azure-eventhub input to the new Azure Event Hubs Client Module for Go #33815

Closed zmoog closed 1 month ago

zmoog commented 1 year ago

What

Azure is introducing a new BETA release of the Azure Event Hubs Client Module for Go aligned with the next-generation Azure SDK.

They are also transitioning the Event Hub client from the independent repo azure/azure-event-hubs-go to the new monorepo azure/azure-sdk-for-go that hosts all Azure SDK clients for Go.

Why

Once released as GA, the new Event Hub package will be the go-to client to connect to Event Hub.

It has a new azeventhubs.Processor that replaces the EventProcessorHost. The new Processor looks like an improved solution over the current one.

For authentication, it leverage Azure Identity credentials from azidentity. More authentication options are available.

Requirements

The new Azure Event Hubs Client Module for Go requires Go 1.18 (now available on both Beats 7.x and 8.x).

Resources:

### Tasks
- [ ] https://github.com/elastic/beats/issues/36377
- [ ] https://github.com/elastic/beats/pull/39511
- [ ] https://github.com/elastic/beats/pull/39796
JulGor-elastic commented 1 year ago

One of my customers is asking about integration with Azure AD when setting up integrations with Azure data sources instead of using shared secret values that may change or may be difficult to override over time. So integration with AAD should make it easier to manage the identities that are used to configure the different data integrations

zmoog commented 1 year ago

New 0.4.0 (2023-01-10) beta release for the Azure Event Hubs Go SDK.

zmoog commented 10 months ago

Quick update on the maturity level: Azure Event Hubs Client Module for Go reached GA on version 1.0.0.

Martin-Kemp commented 10 months ago

Just mentioning that this would be very useful to us. It's currently blocking us from using Beats with Eventhubs.

zmoog commented 10 months ago

Microsoft marked the github.com/Azure/azure-event-hubs-go/v3 package as deprecated in the v3.6.1 release.

P1llus commented 10 months ago

I think if there are plans to implement a "v2" of the input, either by running the old and new one side-by-side for a while, behind a config flag for example (we did this for httpjson), then there could be certain other improvements that could be implemented as well:

  1. Currently we do not expose or utilize any configurations that is supported by the SDK client in terms of tweaking throughput. Unsure what this new library exposes, but things like message counts, sizes etc would be a good start.
  2. S3, GCS and ABS and maybe other inputs supports the concepts of "worker counts". This has 1 single go-routine that fetches the list/available new messages, and sends each unprocessed event to a "worker" go-routine, that processes it and updates its state if necessary. This allows certain scaleability vertically and not only horizontally.
Martin-Kemp commented 9 months ago

I've started working on this and have managed to get it working with the new SDK.

Here's my fork: https://github.com/Martin-Kemp/beats/tree/new_azure_eventhubs_sdk

It's still very basic. I basically just replaced the SDK, deleted the eph.go file and replaced it with a processor.go with the same logic.

Needs a bit more work but I compiled it and it's working and importantly it now adds the partitionKey field to the messages which is what we need from the new SDK.

Martin-Kemp commented 9 months ago

I wouldn't mind a bit of help to get this over the line. I'm not a Go programmer nor am I very familiar with this repo :)

zmoog commented 4 months ago

Hey @Martin-Kemp! We are starting to upgrade the input to the new SDK. How is your fork doing?

Martin-Kemp commented 4 months ago

Hi @zmoog, that's great news! My fork has been working mostly fine, there seems to be some disconnect issues at higher message rates but I'm yet to figure out the root cause. We were also having Azure issues at the same time so could have been that.

zmoog commented 4 months ago

It's great to hear the fork is working for you. I'll update you on this issue as we progress.

Martin-Kemp commented 4 months ago

@zmoog how is work on the new input progressing? I'd be happy to do some testing if it'll help?

zmoog commented 4 months ago

Hey @Martin-Kemp, the work is progressing. I am currently working on https://github.com/elastic/beats/issues/36377 to upgrade the input to input API v2 as a preparation step. I'll open a PR for this change soon.

The current plan is to offer both the legacy (v1) and modern (v2) SDKs with a configuration option.

zmoog commented 3 months ago

@Martin-Kemp, I have made some progress on this topic.

First, the review is in progress on the PR updates the azure-eventhub to input API v2. This PR is a prerequisite to moving the new SDK.

Second, I pushed a working prototype of the azure-eventhub input using the new SDK at https://github.com/zmoog/beats/pull/1 (it's on my peronal fork because the first PR is not merged).

there seems to be some disconnect issues at higher message rates but I'm yet to figure out the root cause.

During the initial tests, the input stopped working due to network issues causing a DNS resolution failure. I'm adding a retry mechanism to restart the processor gracefully when it exits in similar circumstances.

If you get a chance to give it a try, let me know how it works. I'm actively working on this PR to add a couple of missing feature.

I'll update this issue when additional changes are ready.

zmoog commented 3 months ago

To run the azure-eventhub input in this test, we need to use the following module config:

- module: azure
  # All logs
  activitylogs:
    enabled: true
    var:
      # eventhub name containing the activity logs, overwrite he default value if the logs are exported in a different eventhub
      eventhub: "sdh4552"
      # consumer group name that has access to the event hub, we advise creating a dedicated consumer group for the azure module
      consumer_group: "$Default"
      # the connection string required to communicate with Event Hubs, steps to generate one here https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-get-connection-string
      connection_string: "<redacted>"
      # the name of the storage account the state/offsets will be stored and updated
      storage_account: "mbrancageneral"
      # the name of the storage account container you would like to store the offset information in.
      storage_account_container: "<redacted>"
      # the storage account key, this key will be used to authorize access to data in your storage account
      # processor v1
      storage_account_key: "<redacted>"
      # processor v2
      storage_account_connection_string: "<redacted>"
      # processor version, valid values are v1 or v2
      processor_version: "v2"

Currently, there are two new config options to set:

      # processor v2
      storage_account_connection_string: "<redacted>"

      # processor version, valid values are v1 or v2
      processor_version: "v2"

The easy-to-use function to create a container client requires a connection string instead of a key, so I added storage_account_connection_string option. We'll see if it's possible to still use the storage_account_key.

The input can work with the legacy SDK (v1) or the modern SDK (v2). You can pick one or the other by setting processor_version accordingly.

Martin-Kemp commented 3 months ago

@Martin-Kemp, I have made some progress on this topic.

First, the review is in progress on the PR updates the azure-eventhub to input API v2. This PR is a prerequisite to moving the new SDK.

Second, I pushed a working prototype of the azure-eventhub input using the new SDK at https://github.com/zmoog/beats/pull/1 (it's on my peronal fork because the first PR is not merged).

there seems to be some disconnect issues at higher message rates but I'm yet to figure out the root cause.

During the initial tests, the input stopped working due to network issues causing a DNS resolution failure. I'm adding a retry mechanism to restart the processor gracefully when it exits in similar circumstances.

If you get a chance to give it a try, let me know how it works. I'm actively working on this PR to add a couple of missing feature.

I'll update this issue when additional changes are ready.

@zmoog thanks so much for all the work so far! I'll test your fork in the next few days and report back.

Martin-Kemp commented 3 months ago

@zmoog, I got your change built and running in our dev environment but then realised it's not going to work with actual data because we use the PartitionKey later in our pipeline to determine our index name in Elasticsearch. Would you mind adding the PartitionKey to the payload? I made a comment in your fork's PR to show where.

zmoog commented 3 months ago

Would you mind adding the PartitionKey to the payload? I made a comment in your fork's PR to show where.

Yep, sure. I was considering adding the partition key alongside the partition ID.

zmoog commented 3 months ago

Would you mind adding the PartitionKey to the payload? I made a comment in your fork's PR to show where.

I can't find the comment, but I added an (optional) partition key field to the event hub metadata.

If the azeventhubs.ReceivedEventData contains a partition key, the input sets it as azure-eventhub.partition_key field alongside the new azure-eventhub.partition_id field and all the other azure-eventhub.* fields.

CleanShot 2024-05-31 at 15 45 07@2x

Martin-Kemp commented 3 months ago

I can't find the comment, but I added an (optional) partition key field to the event hub metadata.

Comment is here: https://github.com/zmoog/beats/pull/1 but it's fine, your way is better :)

zmoog commented 3 months ago

Opened PR https://github.com/elastic/beats/pull/39796 in the main repo.

zmoog commented 1 month ago

Hey, @Martin-Kemp, are you still using this version? How is going?

Martin-Kemp commented 1 month ago

Hey, @Martin-Kemp, are you still using this version? How is going?

Hey @zmoog, I've been using my buggy version due to a blocker we had with Azure. That's resolved now and I've deployed the latest commit from the PR. It connects to the eventhubs successfully and data appears to be flowing. I'll keep and eye on it and report back.

Martin-Kemp commented 1 month ago

I'm getting some serialization errors:

{"log.level":"debug","@timestamp":"2024-07-30T13:41:31.918Z","log.logger":"input.azure-eventhub","log.origin":{"function":"github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub.(*messageDecoder).Decode","file.name":"azureeventhub/decoder.go","file.line":84},"message":"deserializing multiple messages to a `records` object returning error: invalid character 'Ã' looking for beginning of value","service.name":"filebeat","ecs.version":"1.6.0"}
{"log.level":"debug","@timestamp":"2024-07-30T13:41:31.918Z","log.logger":"input.azure-eventhub","log.origin":{"function":"github.com/elastic/beats/v7/x-pack/filebeat/input/azureeventhub.(*messageDecoder).Decode","file.name":"azureeventhub/decoder.go","file.line":90},"message":"deserializing multiple messages to an array returning error: invalid character 'Ã' looking for beginning of value","service.name":"filebeat","ecs.version":"1.6.0"}

I don't think I saw this with my version. Also some messages are not making it through a downstream logstash with:

"status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [Payload] tried to parse field [Payload] as object, but found a concrete value"}}}}

I think I'll have to roll back and investigate a bit further.

zmoog commented 1 month ago

I'm getting some serialization errors [..] Also some messages are not making it through a downstream logstash with:

Oh wow, this is interesting.

Can you share a few message samples that are generating these errors, so I can add a test? If you don't feel like sharing logs samples on a public GitHub issue, you can send them to maurizio.branca@elastic.co

Thanks!

Martin-Kemp commented 1 month ago

Looks like there was a device sending strings (not json). I think that's what caused the serialization error. Setting up a dev data stream to run this end to end then I'll report back.

zmoog commented 1 month ago

Looks like there was a device sending strings (not json). I think that's what caused the serialization error. Setting up a dev data stream to run this end to end then I'll report back.

Yeah, currently, the input assumes all messages are JSON documents, mostly coming from Diagnostic Settings. If the input can't decode a message, it should forward it to Elasticsearch as a string. The input shouldn't drop the message; I'll double-check.

Martin-Kemp commented 1 month ago

Hi @zmoog. I tried a very simple test by using the Azure portal to send a message to the eventhub and then setting stdout as the output. Here are the results: Message sent to eventhub:

[
    {
        "key1": "value1",
        "key2": "value2",
        "key3": "value3",
        "nestedKey": {
            "nestedKey1": "nestedValue1"
        },
        "arrayKey": [
            "arrayValue1",
            "arrayValue2"
        ]
    }
]

Message printed in stdout of the beat:

{
  "@timestamp": "2024-08-07T13:41:15.236Z",
  "@metadata": {
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.16.0"
  },
  "input": {
    "type": "azure-eventhub"
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "SandboxHost-638586344103559027"
  },
  "agent": {
    "ephemeral_id": "c03c3930-b5a3-4073-9809-a1f8d314f5dc",
    "id": "763ba0e5-934c-4a80-9adc-177db2dfc17f",
    "name": "SandboxHost-638586344103559027",
    "type": "filebeat",
    "version": "8.16.0"
  },
  "message": 0,
  "azure": {
    "offset": 0,
    "sequence_number": 0,
    "enqueued_time": "2024-08-07T13:41:14.239Z",
    "partition_id": "3",
    "eventhub": "msb.os",
    "consumer_group": "$Default"
  }
}

So I think something is still not right here. I noticed with our production data I was also getting message:0 and thought it might be something to do with our data but if it doesn't work with this simple message then I don't think that's the case.

zmoog commented 1 month ago

Message sent to eventhub:

[
    {
        "key1": "value1",
        "key2": "value2",
        "key3": "value3",
        "nestedKey": {
            "nestedKey1": "nestedValue1"
        },
        "arrayKey": [
            "arrayValue1",
            "arrayValue2"
        ]
    }
]

Hey @Martin-Kemp, thanks for the sample log. I'm giving it a try.

Do you get a different outcome from the old input?

zmoog commented 1 month ago

Do you get a different outcome from the old input?

It works okay on 8.15.0.

Checking what the decoder is doing differently with this sample. Adding a test to the suite with your case.

zmoog commented 1 month ago

My bad; I made an embarrassing mistake. I just pushed a fix. @Martin-Kemp, please let me know if it now works for you when you have time to test it.

Thank you for checking and catching the error.

Martin-Kemp commented 1 month ago

My bad; I made an embarrassing mistake. I just pushed a fix. @Martin-Kemp, please let me know if it now works for you when you have time to test it.

Thank you for checking and catching the error.

Thanks @zmoog I'll try it tomorrow and report back 👍

Martin-Kemp commented 1 month ago

@zmoog, this worked in my test environment. I'll deploy to production on Monday.