logstash-plugins / logstash-input-azure_event_hubs

Logstash input for consuming events from Azure Event Hubs
Apache License 2.0
18 stars 28 forks source link

Is there a way to receive the deviceId of an event? #45

Open moritz-tr opened 4 years ago

moritz-tr commented 4 years ago

I would like to use the plugin to monitor an azure iot hub via its event hub endpoint. I am able to receive events and its metadata like described in decorate events documentation. But I do not find a way to log the iothub-connection-device-id from the device which sends the event.

acd-hfi commented 4 years ago

I have been looking at the same thing. It would be very easy to add a few more fields, but I don't know if they'd add them for a generic event hub plugin (maybe if a config option was added to indicate it is an IoT hub or throwing it on metadata?). I have been testing lately, and I added the below lines to be able to get the device and module ids, if present. These are placed directly on the top level event. I also included the enqueued time since I didn't want to do crazy parsing of metadata to get at it.

Changes needed (add around line 50 in processor.rb just before outputting to the queue):

              event.set("enqueuedTimestamp", payload.getSystemProperties.getEnqueuedTime.getEpochSecond)

              hasDevId = payload.getSystemProperties.containsKey "iothub-connection-device-id"
              if  hasDevId
                deviceId = payload.getSystemProperties.get "iothub-connection-device-id"
                event.set("connectionDeviceId", deviceId)
              end

              hasModId = payload.getSystemProperties.containsKey "iothub-connection-module-id"
              if  hasModId
                moduleId = payload.getSystemProperties.get "iothub-connection-module-id"
                event.set("connectionModuleId", moduleId)
              end

If you want to kick it around a bit, I forked this and pushed a gem for it here for testing: https://rubygems.org/gems/logstash-input-azureiothub

The config for it is would be exactly the same as this one, except you'll want to add a date filter and the input filter has a different name. The date filter will parse the timestamp and it can be used for an index timestamp for elastic.

    input {
        azureiothub {
            event_hub_connections => ["<Connection string>"]
            threads => 4
            decorate_events => false
            consumer_group => "logstash"
            storage_connection => "<Connection string>"
            storage_container => "logstash"
        }
    }
    filter {
        date {
            match => [ "enqueuedTimestamp","UNIX" ]
            target => "enqueuedTime"
            remove_field => [ "enqueuedTimestamp" ]
        }
    }
    output {
        elasticsearch {
            hosts => [ "my-elasticsearch:9200" ]
            user => "elastic"
            password => "<Password>"
            index => "azureiothub-%{+YYYY.MM.dd}"
        }
        stdout { }
    }
acd-hfi commented 4 years ago

It is probably also worth mentioning that I tested updating the gradle wrapper to 5.1.1 while at this, and everything still appears to work w/o needing to make any additional changes. It should be safe to update this project's version of gradle so v11 of the JDK can be used.

nenright commented 3 years ago

@acd-hfi, this was exactly what I was looking for. Worked great! Did you submit a pull request for this?