giraffi / fluent-plugin-amqp

Use AMQP broker to send or receive messages via FluentD
MIT License
15 stars 31 forks source link

Add Message Headers to Fluentd Event from Source #58

Closed AO-StreetArt closed 4 years ago

AO-StreetArt commented 5 years ago

Message Headers from AMQP need to be added to the Fluentd events generated from the amqp source type. Currently, this information (except the timestamp and one field which can be used as a tag) is lost, and the only workaround is to include it directly in the payload.

The input plugin needs to feature the same support for AMQP message headers that the output plugin does.

iuyiuy commented 5 years ago

+1. I've gotten exceptions like this: 2019-01-03 09:57:53 +0000 [error]: #0 unexpected error error_class=Bunny::PreconditionFailed error="PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'syslog' in vhost '/': received none but current is the value 'syslog-dead-letter' of type 'longstr'"

I can't set the x-dead-letter-exchange argument in the plugin and either bunny or rabbitmq are requiring me to specify it.

warmfusion commented 5 years ago

@AO-StreetArt - So if I understand you correctly;

On the SOURCE type (ie in_amqp) plugin, you'd like to keep the amqp even headers and include them as part of the event emitted to the fluent engine.

As you're looking for a similar behavior where we map headers into fields on the incoming payload, that assumes we can marshal the payload into a structured object to then modify it prior to emitting to the rest of the fluent engine.

What would you propose as a failure scenario for payload errors, and how would you see configuration of the target header field being described? Would it include all headers, or a set similar to that defined in the <header> option.

I'm thinking;

<source>
  ...
  <header>
     name my-custom-header
     field my_target_field
     default 'optional value if missing'
  </header>
  <header>
     name a-nested-example
     field request.header   # dot separated nested fields
  </header>
</source>

The outcome of the above configuration would be a json object such as;

{ "my_target_field": "headervalue", "request": { "header": "nestedvalue" } ... rest of event}
MaheshKReddy commented 5 years ago

@AO-StreetArt - So if I understand you correctly;

On the SOURCE type (ie in_amqp) plugin, you'd like to keep the amqp even headers and include them as part of the event emitted to the fluent engine.

As you're looking for a similar behavior where we map headers into fields on the incoming payload, that assumes we can marshal the payload into a structured object to then modify it prior to emitting to the rest of the fluent engine.

What would you propose as a failure scenario for payload errors, and how would you see configuration of the target header field being described? Would it include all headers, or a set similar to that defined in the <header> option.

I'm thinking;

<source>
  ...
  <header>
     name my-custom-header
     field my_target_field
     default 'optional value if missing'
  </header>
  <header>
     name a-nested-example
     field request.header   # dot separated nested fields
  </header>
</source>

The outcome of the above configuration would be a json object such as;

{ "my_target_field": "headervalue", "request": { "header": "nestedvalue" } ... rest of event}

Is this a posibility in near future ?

AO-StreetArt commented 5 years ago

Apologies for the late reply.

I like the proposed design. Our use case requires grabbing specific headers from the event, if available. I also think it makes sense to have a default that is put into the payload when the header isn't present.

You can also take a look at the implementation here if you need ideas: https://github.com/nttcom/fluent-plugin-rabbitmq/issues/8

I raised issues in both plugins for support for this, but my company wound up using fluent-plugin-amqp, so we'd love to get these supported here so we can use it!

MaheshKReddy commented 5 years ago

@warmfusion :: I added a pullrequest to support include_headers configuration, Can you please review ? https://github.com/giraffi/fluent-plugin-amqp/pull/62#partial-pull-merging

extead commented 4 years ago

Hello, @warmfusion, @MaheshKReddy. What about this pull request https://github.com/giraffi/fluent-plugin-amqp/pull/62 ?

warmfusion commented 4 years ago

Merged into master. Releasing as 0.14.0 as its been a long while since we had a release and I want to protect people for any strange dependency issues.

Cheers