Closed threadwaste closed 8 years ago
👍 on the functionality. It feels a little weird to add our own gzip decompression code, when logstash already has a gzip decompression codec... but since the cloudwatch_logs
setting can't work without gzip, I think it's the right thing to do. It'd be weird to tell people to set the setting here, and then install and configure that gzip codec as well.
Please update the README to describe the new settings.
Agree re: the codec. Even if the plugin simply used codec => gzip
, we would still have to duplicate e.g. the JSON codec for working with the event. This felt better because the plugin can than punt to whatever codec for the actual underlying data: plain, json, whatever.
That's true, and actually brings up a point that I'm not clear on. Your current implementation is assuming that the user specified codec => json {}
in their logstash config, right? Since it's the @codec.decode(record)
line that is decoding the raw JSON data into a Hash that you then access.
Should this be modified so that we explicitly JSON decode the overall cloudwatch message, and then apply the user-specific codec to each individual logEvents[n]["message"]
field? I can't try it out myself to be sure, right now, but that's what the AWS docs make it look like:
{
"owner": "123456789012",
"logGroup": "CloudTrail",
"logStream": "123456789012_CloudTrail_us-east-1",
"subscriptionFilters": [
"RootAccess"
],
"messageType": "DATA_MESSAGE",
"logEvents": [
{
"id": "31953106606966983378809025079804211143289615424298221568",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221569",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
},
{
"id": "31953106606966983378809025079804211143289615424298221570",
"timestamp": 1432826855000,
"message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root\"}"
}
]
}
It feels weird to put special cases in for gZip and cloud watch for this input. For discussion, can this cloud watch functionality exist instead as a separate input or a codec instead?
On Monday, August 15, 2016, Anthony M. notifications@github.com wrote:
This is an initial attempt at adding support for CloudWatch Logs subscription data and general gzip-compressed records per #2 https://github.com/logstash-plugins/logstash-input-kinesis/issues/2. I'm looking for feedback before cleaning up what's been done. This has been tested against live CloudWatch Logs data.
This PR presents two new configuration options, compression and cloudwatch_logs. Their defaults maintain the plugin's current behavior.
The compression option accepts nil or "gzip." When nil, the plugin's original behavior is retained. When set to "gzip," the plugin decompresses each record with gzip.
When cloudwatch_logs is set to true, each record is split on its logEvents field. The end result is n events containing the original record's logGroup, logStream, messageType, owner, and subscriptionFilters fields merged with each logEvents document. The original logEvents field is removed from each split event. When cloudwatch_logs is true, compression
must be set to gzip.
You can view, comment on, or merge this pull request online at:
https://github.com/logstash-plugins/logstash-input-kinesis/pull/3 Commit Summary
- Add compression, cloudwatch_logs options
- Require GZip when parsing CloudWatch Logs
- Implement Charset, GZip processors
- Use generate_records in worker spec
- Use plugin processors in Worker
- Split logEvents field when parsing CloudWatch Logs
- Clean up specs; make GZip's #decode private
File Changes
- M lib/logstash/inputs/kinesis.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-0 (22)
- A lib/logstash/inputs/kinesis/processors/abstract.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-1 (16)
- A lib/logstash/inputs/kinesis/processors/charset.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-2 (18)
- A lib/logstash/inputs/kinesis/processors/gzip.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-3 (16)
- M lib/logstash/inputs/kinesis/worker.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-4 (47)
- A spec/inputs/kinesis/processors/charset_spec.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-5 (29)
- A spec/inputs/kinesis/processors/gzip_spec.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-6 (32)
- M spec/inputs/kinesis/worker_spec.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-7 (89)
- M spec/inputs/kinesis_spec.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-8 (4)
- M spec/spec_helper.rb https://github.com/logstash-plugins/logstash-input-kinesis/pull/3/files#diff-9 (5)
Patch Links:
- https://github.com/logstash-plugins/logstash-input- kinesis/pull/3.patch
- https://github.com/logstash-plugins/logstash-input- kinesis/pull/3.diff
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/logstash-plugins/logstash-input-kinesis/pull/3, or mute the thread https://github.com/notifications/unsubscribe-auth/AAIC6ia35SrFaFHR_K-SXOmbTqUfgpiPks5qgNe7gaJpZM4JkyBn .
@codekitchen That's where it starts to get murky. For example, you can send multiple CloudWatch log groups to a single Kinesis stream. If the specified codec were to parse each message, every log group would have to fit it. Using the above implementation, I was leveraging both built-in CloudWatch Logs subscription filters and punting downstream to plugins like JSON filter.
@jordansissel A separate input would duplicate a majority of this plugin's work. I've only looked at a few codec plugins, and porting this work to one seems doable. What isn't readily apparent to me, however, is the contract between inputs/outputs and a codec's #decode and #encode methods.
As I understand things, strings would need to be sent to e.g. plain
or json
. Conversely, the Kinesis record's byte buffer would be needed by a cloudwatch_logs
codec. Is this something the framework accommodates?
New to working with Logstash's code base at this level. Sorry for anything outlandish above. 💃
@threadwaste That doesn't seem like a problem with what I'm suggesting, though. You could use a codec in the simple case when you know that all the messages need the same processing, otherwise skip using a codec and use filters or some other mechanism.
I definitely think we should explicitly decode the overall JSON message rather than depending on the codec being set to JSON, since that's a core property of cloudwatch logs, not something that should be configurable.
@jordansissel there was some discussion in the original ticket about making this a codec, but we decided against it because it would make configuration more complex, and the codec wouldn't be useful outside of this input plugin. Perhaps there's a modularity argument for making it a codec anyway, though.
@codekitchen @jordansissel I finally have the time to revisit the changes discussed above. Before implementing, I quickly threw together what a codec would look like as an alternative. This is a very rough proof.
It does require a change to this plugin. Specifically, a patch along these lines. This is a non-breaking change in what tests I ran; the plain, json, et al. codecs all work with this plugin the same as they do today.
From a Kinesis+CloudWatch Logs user perspective it makes sense to me. I think it also makes the configuration simpler and explicit. Similar codecs also exist for specific message formats so I don't think it's a too-specific implementation, either. It might even have use for S3 exports, though I can't find any docs confirming an equivalent format.
At any rate, sorry for dragging this out. I'm happy to wrap this PR up by implementing the above items. The codec was just easy to throw together, and admittedly I got a bit curious.
I'm told that S3 exports are in a simpler message-per-line format. I'm cool with doing a separate codec if you prefer that approach though. We'll just want to carefully document the interaction between the two, and maybe given an example config with an addition JSON filter since that's probably the most common use case. Thanks again!
What are your thoughts on the patch gist? The viability of the codec hinges on it or something similar. If you don't see an issue modifying the Kinesis input plugin accordingly, I'd personally say the codec route seems more "naturally" Logstash.
Alternate proposal: If the cloudwatch-kinesis format won't appear anywhere else, mabe it should be a new input plugin instead of a codec?
An input plugin for CloudWatch Logs from Kinesis duplicates a lot of the code written here. So, configuration options on this input, or a codec, makes more sense in my head. Ultimately, I'll defer to the Logstash authorities on that matter. I'm happy -- and pretty motivated -- to implement whichever of the lot is deemed best for the long run.
Let's go with the codec approach, along with the proposed change to this input plugin. If down the road we determine that a new input plugin is a better choice, we'll still have the flexibility to do that.
I think that'll also naturally solve the gzip'd data use case that was mentioned in the original ticket, since it'll allow for using the gzip_lines
codec and then a json
filter.
@codekitchen Sounds good to me. I will rework this pull request to instead be a version of the patch above. I'll also clean up the codec, and get a Gem produced and published. Thanks!
@codekitchen @threadwaste I just updated this plugin to work with 5.0 so this may need a rebase.
@codekitchen @suyograo This PR now encompasses #2 and #10. Happy to rename branches and/or close this PR in favor of opening it anew. Let me know.
If this work is accepted and merged, I will polish the cloudwatch_logs codec I wrote to be used with these changes.
Great stuff, thank you @threadwaste! Glad to see that upgrade problem was a spec issue, not a code issue, too. I'll cut a new gem.
These changes were removed after version 2.0.1, they don't exist in 2.0.2 or 2.0.3
This is an initial attempt at adding support for CloudWatch Logs subscription data and general gzip-compressed records per https://github.com/logstash-plugins/logstash-input-kinesis/issues/2. I'm looking for feedback before cleaning up what's been done. This has been tested against live CloudWatch Logs data.
This PR presents two new configuration options,
compression
andcloudwatch_logs
. Their defaults maintain the plugin's current behavior.The
compression
option accepts nil or "gzip." When nil, the plugin's original behavior is retained. When set to "gzip," the plugin decompresses each record with gzip.When
cloudwatch_logs
is set to true, each record is split on its logEvents field. The end result is n events containing the original record's logGroup, logStream, messageType, owner, and subscriptionFilters fields merged with each logEvents document. The original logEvents field is removed from each split event. Whencloudwatch_logs
is true,compression
must be set togzip
.