logstash-plugins / logstash-input-s3

Apache License 2.0
57 stars 150 forks source link

[WIP] Refactor of the plugin for better performance and scaling #84

Closed ph closed 10 months ago

ph commented 8 years ago

State

This PR is not ready for a full review, but I would like feedback with the architecture I took.

Motivation:

The code base of the S3 was a bit monolitic and really hard to test this change make sure to create smaller class with less responsibilities that are easier to test and reason with.

Also this PR make an really important change and introduce multithreaded workers, so this plugin can effectively read file and download them at the same time.

Theses commits and changes make the way to support S3 notification, most of the code will be reusable to create a new inputs that will be stateless. I believe we only need to replace the Poller class with a SQSListener and the remaining of the code shouldn't change.

Remaining

brandond commented 8 years ago

Are you still working on this? I have a bucket with about a year of CloudTrail data from a dozen or so accounts that I'd like to pull in, and the current S3 input plugin is nowhere near capable of handling it. It took a half hour just to pull in the last week of data from one region in one little-used account.

Ideally, the refactored code would:

I'm glad to contribute to some of this, but unfortunately this plugin appears to be dead in the water, with multiple stale open issues and PRs with no response.

ph commented 8 years ago

@brandond The plugins isn't dead, refactoring and review takes times.

I think the core in this PR is flexible enough to add your recommendations concerning the polling, actually I wanted to make the core easy to reuse to support S3 Notification, so we can scale the processing horizontally with mutiples instances.

brandond commented 8 years ago

Thanks @ph. I was a little scared off by the period of inactivity and open items, although I do see that some of the open PRs have been merged but not closed.

I have some additional thoughts for things that I'd like to see for my use case - configurable parallelism and batch sizes, depth-first searching through subkeys (a la directory hierarchy), etc. Are you open to PRs, and if so, should I work on master, or your threading fork? Is the threading fork even usable in its current state?

ph commented 8 years ago

@brandond It was usable, let me a few days to clean it up, I will try to get get that merged in the upcoming weeks. I will keep you in the loop and will really like to collaborate to make it better. I've been doing the same work on the S3 output, so theses plugins should be much better in the near future.

Concerning the parallelism, I am not sure if we should put a lot of energy in that plugin, It's OK for older buckets that we want to consume but in a real world usage I think people should use the S3 Event Notificiation via sqs, this model is easier to deal than having to read from the bucket since AWS will just push new files as SQS message. As the same time the parellelism will come from multiple instances consuming the same queue? I took great care in this refactor to support that feature, actually its only a matter of adding new fields for the SQS queue and replacing the Poller class with a Push class.

ph commented 8 years ago

to keep you in the loop @geekpete

brandond commented 8 years ago

I opened #86 to describe some of my thoughts around optimized parallel polling for CloudTrail. You're probably right that SQS notification is more efficient than polling, at least once things are up and running.

I still don't have a good way to get my existing 2.5 million (and growing) files ingested though - the current approach of enumerating the entire bucket every poll is a non-starter for a lot of use cases.

geekpete commented 8 years ago

How about a lambda to queue up the existence of new files landing?

brandond commented 8 years ago

Not sure why you'd need a Lambda to queue it up when you can just point the notifications directly at SQS. Alternately you could point S3 at SNS and subscribe multiple endpoints to the topic (Lambda, SQS, etc). That's the best way to do fanout, since each S3 event can only notify a single endpoint.

geekpete commented 8 years ago

Ah forgot about that, makes it easy that way.

So @jarpy had an idea of loading the files into ES directly from the cloud. A service that ingests cloudtrail files into an ES endpoint. A lambda that sends newly detected files at the service. This allows reprocessing manually against the service if needed, eg reindex.

brandond commented 8 years ago

Yeah you could definitely cut out the middle man and have Lambda get the S3 notification, pull down the file, unpack events, and stuff them into ElasticSearch. We've been doing that (with some filtering and data augmentation) and storing stuff into Dynamo. This is specifically what we're looking into replacing with LogStash, if we could get the S3 ingest streamlined ;)

If you wanted to go halfway with it, you could probably use Lambda to grab the S3 notification, download the archives, unpack the event JSON, and then toss them in as SQS message payloads. Get LogStash to pull off the queue, do filtering, and output to ElasticSearch. Wouldn't even require any new code on the LogStash side.

joshuaspence commented 7 years ago

+1 on this pull request. If S3 event notifications are supported then I will be able to use this plugin instead of logstash-input-s3sqs.

brandond commented 7 years ago

@joshuaspence you might check out this fork that I'm using to pull in CloudTrail logs: https://github.com/brandond/logstash-input-s3/tree/simple-sincedb

kureus commented 7 years ago

Hey @ph. What's the state of this PR? Is the original intent of it still valid and relevant? It looks like @brandond make progress towards getting it merged in #87 but it never made it in to 5.0 beta. Is your existing todo list on this issue a good place to start in terms of contributing? Any context on Elastics current view/plan for this plugin would be great. Thanks.

brandond commented 7 years ago

@kureus @ph I ended up doing basically a complete rewrite based on the threading fork, which I've been using in production for about 9 months. The branch name is probably not super accurate at this point, but you can find it here: https://github.com/brandond/logstash-input-s3/tree/simple-sincedb

It would probably take a lot of work to reconcile its configuration options with the current Elastic-provided module, but I'd be willing to put some time into it if there's interest.

suyograo commented 7 years ago

@brandond thanks for all your work on this. we would support your efforts if you can spend time reconciling this PR with that what you have.

We can probably open a new PR with your changes?

robgil commented 6 years ago

Might be cool to make a queue optional here and merge with this. Threading support + queue would be awesome.

https://github.com/logstash-plugins/logstash-input-s3sqs/tree/feature/logstash-6.x

jordansissel commented 6 years ago

I'm going to take this PR over from @ph <3

brandond commented 6 years ago

My (wildly divergent at this point) fork has been working well for the last year or so. The design goals may be a bit different though.

jordansissel commented 6 years ago

@brandond Good info! I'll take a look at your fork (https://github.com/brandond/logstash-input-s3/tree/simple-sincedb, right?) and see what I can combine here.

brandond commented 6 years ago

@jordansissel Yep, that's the latest.

The design goal for my fork was to 'tail' multiple prefixes. We have a bunch of stuff streaming into various buckets from multiple accounts. Within each bucket, files are created with an ///// structure. Under each / prefix, files are guaranteed to be created with ascending names - so you can 'tail' that prefix by remembering what object you last processed out of that prefix, and asking the S3 API to list everything after it. The plugin discovers top-level prefixes down to a configurable depth at startup, and then stores both the prefix and the name of the last processed object in the SinceDB.

There is some basic multi-threading support. I'd intended to have it process prefixes in parallel, but right now it just works through them one at a time, sequentially assigning objects be processed by a configurable number of worker threads.

The handling of files that fail to process also needs work. Objects are failed after a fixed number of retries, and the SinceDB does not handle this well - it can cause it to grow out of control. I've only seen this happen once, but it did end up corrupting the DB file due to sheer size.

ph commented 10 months ago

I will close this PR, I have currently no time to push it forward. Feel free to make it your own.

eherot commented 10 months ago

Doing that as we speak. Thanks for the A+ framework though!