elastic / logstash

Logstash - transport and process your logs, events, or other data
https://www.elastic.co/products/logstash
Other
14.11k stars 3.48k forks source link

Reliability improvements for message delivery #2609

Closed suyograo closed 8 years ago

suyograo commented 9 years ago

Logstash currently does not provide end-to-end delivery guarantees. When a plugin fails to process an event, it does not signal to an earlier stage in the pipeline that an error has occurred. Also on successful message delivery, there is no acknowledgement to previous stages.

In the longer term, we plan to introduce optional mechanism for such notifications to give users an easier way to keep track events as the flow through the pipeline. Additionally, these acks will provide complete event resiliency (from source to destination) when used with message brokers like Apache Kafka and RabbitMQ. For example, we should track when an event enters Logstash and ack only when it successfully reaches a destination. Otherwise, the message is replayed by the source.

jordansissel commented 9 years ago

A non-exhaustive list of things to do:

jordansissel commented 9 years ago

@theflimflam has been exploring message delivery behaviors with what I see as "exactly one input to exactly one output" guarantees in a discussion on #2579.

I think we can provide a bit more general message delivery semantics across any inputs/outputs assuming those plugins use network transports that support acknowledgement or response.

heymatthew commented 9 years ago

@jordansissel slight correction, #2579 tries to specify "one-to-many inputs will ack after delivery to exactly one output". Configured inputs and outputs that aren't marked to behave like this will fall back on default behaviour and could be configured on the same node.

Simple example: A logstash node acks on delivery from lumberjack to elasticsearch.

Slightly more complicated: Ack on delivery from lumberjack to rabbitmq, and also deliver IRC output. Delivery or non delivery to IRC would not affect ack to lumberjack.

Another example: Ack on delivery from lumberjack to rabbitmq, and also receive from twitter input. Delivery or non delivery from twitter does not affect ack to lumberjack.

Waiting on feedback on the scope to proceed with #2579 work, but it seems like it achieves some of what this issue describes. The examples above are implemented against a PR on top of Logstash 1.4.

andrewvc commented 9 years ago

It seems that the key concern here is the possible reliability guarantees we can make in a distributed system (which logstash is).

Below I've listed my ideal goal list for Logstash in order of descending desirability and ascending simplicity.

Note: Unsurprisingly, the 'BEST' option gives us the same guarantees that Kafka can give us. From the Kafka docs:

So effectively Kafka guarantees at-least-once delivery by default and allows the user to implement at most once delivery by disabling retries on the producer and committing its offset prior to processing a batch of messages. Exactly-once delivery requires co-operation with the destination storage system but Kafka provides the offset which makes implementing this straight-forward.

jordansissel commented 9 years ago

+1 on figuring out where and how we can provide certain delivery guarantees. I have strong faith in the value of persistence queues, but those aren't guaranteeable either without constraints (if Logstash's hardware gets sucked into a black hole, presumably the persisted messages are lost :wink:). Even with distribution, there's still a constraint (what if all copies of a message are lost? What if Logstash fulfills a delivery guarantee and the receiving system falls is crushed by a monster truck?). The lesson I've learned from Kyle Kingsbury's wonderful database/CAP research is that honest and clear documentation is key. Let's figure out what we want, then how to provide it, and document exactly the constraints within which we can provide whatever delivery properties.

andrewvc commented 9 years ago

@jordansissel I agree that this is a hard problem to solve. In the short term I think we should aim for single-node/at-least-once. The key thing I'm aiming at here is: we should aim for a persistent local queue with transactional capabilities.

To do this we need to do more than just making the existing queues persistent. The issue is that just making the queues persistent does not solve the problem of a plug being pulled while a message has been pulled off the input->filter queue and has yet to be pushed onto the filter->output queue. To guarantee that nothing is lost once it gets into the pipeline we essentially must build end to end acknowledgements in some form (or at least the infrastructure to make that possible).

I think we need to something like Kafka, or to use something like RabbitMQ's acknowledgements. Essentially, we need a transaction between the pop and the push. Once we've done this we have essentially created a system that provides full end to end acknowledgements that a message has been processed at least once.

andrewvc commented 9 years ago

@colinsurprenant @jordansissel unless anyone objects I'm going to make a new branch with pluggable queues using the current SizedQueue interface. This shouldn't be much work, and it'll let us at least experiment with stuff while we have this discussion.

andrewvc commented 8 years ago

It looks like Tape is not a good option because it only works without corruption on yaffs.

Why they don't put this front and center on their docs is beyond me.

andrewvc commented 8 years ago

Chronicle is durable for system crashes, but not power outages. https://groups.google.com/forum/#!topic/java-chronicle/EJMnxsB-dC4

andrewvc commented 8 years ago

I've created a new branch to experiment on with support for pluggable message queues. This is just to test the waters. It's available here: ( https://github.com/andrewvc/logstash/tree/pluggable_queues ). The idea is to let us compare code in this thread.

This branch exposes a new option -x SizedQueue|ChronicleQueue, which lets you define the queue you want to use. It also reworks the pipeline to to call #ack on the queue after each message is processed. This will be important for durability since we need to acknowledge, and possibly replay events if they were mid-pipeline while processing.

I have a very hastily written (IE buggy/incomplete) PoC using chronicle here: https://github.com/andrewvc/logstash-chronicle-queue .

Please don't judge the code quality in either repo, this is just for experimentation. I just wrote the minimum to get events in one end and out the other. I'll try to round it out to something respectable this week.

Chronicle never deletes anything, and it is recommended in the FAQ to simply rotate queue files every so often (which is pretty easy to do) and should be much more performant. I have the code structure in place to do so, and it already internally supports resuming an unacked reads, it just needs more boilerplate to juggle files.

jordansissel commented 8 years ago

Acking is gonna get funky since it may cause friction with how outputs currently buffer things.

Example, using Elasticsearch output (w/ flush_size > 1)

If an unsafe shutdown (power loss, kernel panic, kill -9, etc) occurs between T2 and T10, event A is lost for the elasticsearch output.

Hypothesizing - Solving this without significant changes would require Stud::Buffer also using on-disk queues and acks. I think this means that there would be a roughly 6:1 ratio of disk writes to each event. (2 writes - once for the original event, once for the ack. Times 3 - one for each queue including the ES outptu Stud::Buffer). 6:1 is seems like not good. Ideal would be 1:1 but given filters can modify events, it is OK to have 2:1 if we cannot do better.

I've thought about this for a while and have worried about it. I've omitted many details above, so if I'm unclear on something, let me know. I am 100% in favor of on-disk queuing and also accepting the performance costs of doing so, but I'd like to steer towards minimizing the disk I/O cost if possible.

andrewvc commented 8 years ago

@jordansissel I share your same concerns. My goal here is to try stuff out experimentally and quantify it. If we find out that acking is indeed terrible for real-world perf, then we can chuck it. However, I think even ES is moving toward more fsyncs having discovered their cost isn't too bad. Chronicle was built to have a high write rate in exactly this scenario, so maybe it won't be so bad. Additionally, it would be nice to at least make it an option. We could turn it off by default if need be.

WRT the stud buffer stuff and batched outputs, I think further up in the ticket we discussed providing an explicit interface for batches and using a queue per output plugin. Basically, these plugins just get sent arrays of events in their preferred batch size (or a smaller array if too much time passes). The other nice property here is that this could actually improve thread liveness and throughput by decoupling the performance of the output plugins. However, like all things perf, I need to measure this to see how true this is.

ph commented 8 years ago

Really appreciate work done here!

During my vacation I was thinking about removing the output queues and only really use one queue between the inputs and the actual work, by work I mean (filter and the outputs).

I foresee a few issues with the multiline filters and how we manage the buffering in the output threads but it could simplify a few things with the recovery scenario and live configuration changes?

edited for typos

andrewvc commented 8 years ago

Thanks @ph!

@ph I think we should take another look at the rehydration strategy. In a previous convo, if I recall correctly, with @colinsurprenant @jordansissel and @suyograo , someone mentioned that there were some big problems there, maybe it was the multiline filter? At any rate, I think it's worth discussing again.

I think the central problem there is one of transactionality. If the pipeline had a solid notion of transactions there would be no issues, though it might cost performance.

I've actually been thinking of benchmarking SQLite as a queue. Its transaction support could be a real asset, and for small datasets like ours I think it could be blazing fast.

andrewvc commented 8 years ago

Thinking through things wrt rehydrating, given that filters can do their own buffering internally (like multiline), I'm even more in favor of a simple rehydration strategy. The tricky thing there is that determining when its safe to delete items in the input queue because more complicated.

What this boils down to is that having that level of persistence would require a full transactional system with cooperation from plugins. For that something like SQLite would make more sense than a traditional queue, but might slow down processing. That being said, chronicle gives you a good amount of control over transactions, so it might be sufficient.

andrewvc commented 8 years ago

Thinking about this a bit more, I think it just makes sense for individual filter plugins to manage their own state via an injected queue if they so desire. The complexity is too high to manage externally, especially given that what a plugin does can change between launches due to config changes.

andrewvc commented 8 years ago

Looking through the various APIs of various messages queues I think we should make the following changes to the internals of pipeline.rb to be able to maximize the plugability of logstash.

andrewvc commented 8 years ago

Last night walking home from the gym I had an idea for persistence. What we want is basically kafka, but only the producer + persistence, we actually don't actually need a lot of the consumer side. Essentially, Kafka solve two problems, safe producing, safe ordered consuming. By rethinking the consuming side a lot of opportunities were opened up. The nice thing about this is that the requirements here actually would let us use Elasticsearch as a backing store (though redis would also work well).

I started a PoC using Elasticsearch late last night and the results were promising (not too bad a speed hit, with great persistence guarantees). I'm hoping to round it out a bit over the weekend and share. I'm excited at this approach because it would mean you could effectively nuke the LS cluster with only a very tiny amount of data lost at the ingest point, which is really always a risk with most input protocols.

untergeek commented 8 years ago

How is ordered consuming guaranteed in this example?

andrewvc commented 8 years ago

@untergeek ordered consuming is actually not guaranteed now once you have multiple filters today (filters can take arbitrarily long). My PoC will have the same guarantees as logstash today. That is to say, a single threaded logstash will have the same ordering guarantees. Most of the stuff that requires ordering should be done at the input (like Multiline with the multiline codec), as correlating lines throughout an async pipeline becomes very difficult very fast.

I have had some thoughts around adding monotonic 'sequence' integers to events as they come in (combined with a UUID for the specific logstash instance). But that is mostly orthogonal to persistence work IMHO. However, I think a better approach would be to do more work on the client or within the input plugin.

andrewvc commented 8 years ago

@untergeek if you have any more detailed questions I'd be glad to chat, I just don't like pushing up code that's in a very rough state. The basic goal is at-least-once delivery, with the caveat that there must be some buffering on the input side (though this could be eliminated with a smart protocol, or a specially crafted kafka input). The key insight here is that by requiring the LS node that received the request to process it we avoid a lot of complexity and can lean on ES, which is very easy to administer, and already in use by logstash customers.

The concept is to create an elasticsearch index per worker and maintain a queue_offset document in elasticsearch. One nice property of this is that these queues are all sharded based on index, so as you add more worker threads latency can be lowered for the same throughput. Then, by smartly batching input requests via the bulk API (with a minimum write-time threshold), we can get good throughput to elasticsearch. Occasionally these queues would be disposed of fully as a form of GC. I created a special queue type based on java's SynchronousQueue that has delivery time guarantees, ensuring that nothing stays in-memory for longer than 200ms+es index time (and this is configurable).

Message acknowledgements are not synchronous, but rather periodically written to an offset document which is written per worker every n milliseconds or documents. Nodes can use this info to recover, though they may replay messages in some cases.

Recovery is a matter of checking the heartbeats of active queues and electing a leader to start recovering data (something easy to do with elasticsearch with document versions). Heartbeats can be accomplished by having each LS node watch documents in ES and notice if they don't change frequently enough from its own perspective (which gets around clock skew issues, it'll use System.nanoTime which is monotonic).

untergeek commented 8 years ago

Thanks for the in-depth explanation!

I guess my only other question is more Elasticsearch performance oriented.

How will a high volume of log lines being written to Elasticsearch, essentially 2x, affect the performance? With document deletes and the segment merges that must surely result, how will I/O be affected? How big a buffer would we want? If these affect Elasticsearch, would it be wiser to use a different data store (Redis, Aerospike, etc.)?

andrewvc commented 8 years ago

@untergeek I think you're 100% right to consider other datastores. I think Elastic has the best ops usability for our audience, but poor speed. The same algorithm could work with a pluggable backend that implements a small interface that could be implemented with Redis, Aerospike, Kafka, or even an SQL database. I do really like that with Elastic you get clustering guarantees and that all our customers already have it installed. Unfortunately, as you noted, its not a queue, and isn't going to be the fastest necessarily.

WRT segment merging, this strategy is about adding a small amount of documents to the index until they are fully acked then blowing away the index ( I need to figure out where a sweet spot is between the number of writes and the cost of index creation). I believe it is possible to disable segment merging by setting the right merge options. I haven't tried that yet, but that should buy more performance. We don't really care about read-back performance, because we'll never have a lot of stuff to read back (back-pressure FTW), and we only ever read back in recovery.

In preliminary tests I was getting pretty good enough perf from Elastic. I'm going to caution anyone reading this that these numbers were a quick test and are probably very wrong, but at least can establish a lower bound on performance. In a quick test locally I got a 3.5x throughput hit on an empty pipeline (generator in, one empty mutate filter, file out). Given that most people's bottlenecks are in filters and IO, that's actually not too bad (lowered my events/sec from 43113 to 15746). And, with some tuning I'm sure it could be improved even further (esp. by disabling segment merging). I'm pretty sure I can find some squeeze in tuning parallelism of writes and batch sizes as well.

With this strategy the biggest real performance overhead is ACKing. Journaling is basically not an issue as each worker thread has its own sharded writer. More workers=less latency and greater perf (so long as your DB is keeping up). The big knob to turn on perf is how often the process ACKs. If you're OK with potentially replaying a large number of messages it will run quite fast. If you ack after each message is filtered or output you bring the thing to a crawl.

untergeek commented 8 years ago

@andrewvc I think this is a great idea. I think that it'd be wise for us to find some way to write to local filesystem (i.e. our own format, or some kind of filesystem db, like sqlite or DB3/4—or even some binary hash—mostly for some semblance of table/columns for ACK). This would allow for those who don't want a secondary system (Elasticsearch, Redis, etc). Your idea for a pluggable system allows for this. I think this would allow us to do resiliency in a way that is very simple, too. This could be very important for users who have Logstash on a remote client that is forwarding to another Logstash, a broker of some kind, etc. They'd need to have a local system, if not the broker.

andrewvc commented 8 years ago

@untergeek that makes sense to me. @colinsurprenant is hard at work on a really cool format, which when I last checked was an MMAPed circular buffer, which should be lightning fast.

I think its up to our users to decide what their durability requirements are. Some might like a centralized system, others might prefer a holding data on their LS instances.

andrewvc commented 8 years ago

We're moving the party over to #3693