awslabs / amazon-kinesis-agent

Continuously monitors a set of log files and sends new data to the Amazon Kinesis Stream and Amazon Kinesis Firehose in near-real-time.
355 stars 222 forks source link

Optimized record size for concatenable events like JSON #57

Open jeisinge opened 7 years ago

jeisinge commented 7 years ago


AWS Kinesis Firehose has a pricing policy that is great for large records. However, for small records of sizes of 1 KB, the customer ends up paying 5x the price of the same amount of data. This is because Kinesis rounds up to the nearest 5 kb.

Further, certain records can be concatenated together and still processed separately down stream. For example, many downstream big data products like Redshift, Hadoop and Spark support multiple JSON documents per line.

So, the following JSON events are equivalent in the above products:

{"event_id": 123, "doc_id": 9372}
{"event_id": 124, "doc_id": 7642}
{"event_id": 125, "doc_id": 1298}


{"event_id": 123, "doc_id": 9372}{"event_id": 124, "doc_id": 7642}{"event_id": 125, "doc_id": 1298}


It would be great if Kinesis Agent automatically combined events that can be combined into as large of records as possible.

Other Similar Items

innonagle commented 7 years ago

👍 The workaround given for #5 would be, at best, hard to implement for say Apache access logs.

In addition compressing the data prior to sending it Kinesis Firehose would also be appreciated. I have seen click stream data which compress up to 94% when in JSON format. This causes Firehose to write files to S3 much more frequently than is really necessary. While compressing in the agent before sending to the delivery stream is not as efficient overall it does allow producers to scale even more.

innonagle commented 7 years ago

@jeisinge I've forked this repo ( and added the ability to put multiple rows in a file into one Firehose record. In the flow definition where you specify filePattern and DeliveryStreamName specify maxDataBytes. You can set this to any integer value, multiples of 5120 are best for Firehose.

I've been running this patch in a production environment for 2 weeks and it's reduced our daily Firehose costss by over 80%.


I didn't have time to crate an RPM. The approach I've taken is to build a jar with Maven and Ant and then install the original RPM via yum and then overwrite the installed jar with the patched one.

Java is not my forté so I am unsure how to run the test suite.

jeisinge commented 7 years ago

@lennynyktyk , fantastic! I'll take a look.

After more testing, it seems I misstated compatibility down stream. Multiple records per line do not work for Spark. So, we have gone to an external process of adding a tab (\t) character after X bytes and using the multiLineStartPattern option with it being set to "\\t". Obviously, this is not ideal because it adds complexity and reduces the ability to stream data to Kinesis. So, I am very much looking forward to your fork!

innonagle commented 7 years ago

@jeisinge FYI, I noticed a issue today with files who's size never exceeds the maxDataBytes parameter.

If the file never exceeds maxDataBytes that data never generates a record. This may be a larger problem with the kinesis agent. I'm not sure when/if I'll have time to fix it but just putting it out there.

chaochenq commented 7 years ago

@lennynyktyk I'm looking at the PRs now, sorry about the delay! Did you get a chance to fix the issue you mentioned above?

innonagle commented 7 years ago

@chaochenq I don't think this is an issue with my PR but rather with how the Kinesis Agent handles lines which do contain the delimiter. E.g. in a normal Kinesis Agent configuration using the normal line parser if there is no newline character in the file the contents of the file will never be sent to Kinesis. This is understandable because the agent is looking for the delimiter and under the assumption there is more data to be added to the record.

There are ways of fixing this though I believe they are out of the scope of #60 . For example, keeping the last record in a buffer and then when the file under observation is rotated then have the agent submit the fragment to Kinesis.