Closed andrewvc closed 8 years ago
This document builds on the discussion and work started by many others on the logstash team and attempts to tackle some of the pipeline level concerns (vs. storage specific implementations) that impart persistence. This document starts from the premise that we want to fully process each message at-least-once, or optionally at-most-once, and proposes a system that can be configurably at-least-once or at-most-once, while maintaining good performance. Additionally, adding in persistence in a future-friendly way opens the door for a number of new features. Remember you can't have exactly once delivery.
The key to moving forward is reframing our relationship to Logstash inputs. Let's examine the relationship our inputs have with logstash:
Each input protocol has its own expectation of what a 'delivered' message is. Kafka, on the one hand is paranoid and will go to extreme lengths to make sure you do the job. Syslog, on the other, just doesn't care, it throws the message over the wall and walks away. Fortunately, there's a good way to make BOTH of them happy. Luckily, we don't need to rewrite the internet.
The key insight here is that almost all of our inputs are like Syslog, and maybe few are like Kafka (e.g. lumberjack maybe?). To fix this problem we need a pipeline that does the following:
The accomplish this we need to rethink the current pipeline. As it stands, the pipeline looks like:
input -> queue(size 20) -> filters * n threads -> queue(size 20) -> output thread
Two different architectures seem to be needed here:
# For syslog style inputs
/> Thread1 -> Journal(async) -> filter -> synchronized output -> ack(set journal offset)
input -> RoundRobinQueue -> Thread2 -> Journal(async) -> filter -> synchronized output -> ack(set journal offset)
\> ThreadN -> Journal(async) -> filter -> synchronized output -> ack(set journal offset)
# For Kafka style inputs
/> Thread1 -> filter -> synchronized output -> ack(set kafka offset)
input -> PartitionPerThread -> Thread2 -> filter -> synchronized output -> ack(set kafka offset)
\> ThreadN -> filter -> synchronized output -> ack(set kafka offset)
Implemented in pseudo-ruby, the syslog friendly queue would look like this:
# Zero capacity queue, only for load balancing between threads
roundRobinQueue = new SynchronousQueue();
# Make sure we coalesce in one thread at a time
coalesceMutex = Mutex.new
# Start consumers
workers.times do
Thread.new do
coalesceMutex.synchronize do
msgs = [roundRobinQueue.take]
# Wait no longer than 5ms for subsequent messages
# and do this no longer than 19 more times to put a cap
# on time spent before journaling
19.times do
msg = roundRobinQueue.poll(5, :milliseconds)
break if msg.nil? # Nothing came in 5ms
msgs << msg
end
end
# In the background, journal the input
journalAsync(msgs)
# Run all filters
processFilters(msgs)
# Since outputs in LS are single threaded, synchronize this
# Also, by using mutexes here one slow output won't block a fast one vs. a separate output queue as we have now.
outputs.each {|output| output.mutex.synchronize { output.process(msgs) } }
# Update the journal offsets to indicate that it can discard
# the journaled messages. This could happen on any schedule, rather than after each batch.
# The longer between acks, the more messages that are replayed
# from the journal in the event of an unclean shutdown.
journalUpdateOffsets()
end
end
# Take in produced values
ingest(roundRobinQueue)
This strategy has a number of benefits.
A kafka friendly queue would use the Kafka SimpleConsumer
to gain greater control of offset handling. This is basically what Kafka wants you to do from the Kafka docs.
consumer = LSKafkaConsumer.new;
# Thread per partition
consumer.consume {|partition|
offset, msg = partition.take()
# Run all filters
processFilters(msg)
# Since outputs in LS are single threaded, synchronize this
# Also, by using mutexes here one slow output won't block a fast one vs. a
# separate output queue as we have now.
outputs.each {|output| outputMutex.synchronize { output.process(msg) } }
consumer.updateOffset(offset)
}
It's clear at this point that I really like Kafka (and who doesn't?), but most people don't want to set it, and Zookeeper up. The good news is, that for our purposes, we can create a Kafka-Lite by implementing our own journal in our database of choice, and this can be pluggable using almost any database backend we want (colin's Jruby MMAP, Elasticsearch, Kafka, Redis, PostgreSQL, RabbitMQ, etc.).
We can even make a pretty snappy clustered version of it using our old friend, Elasticsearch, which nearly ever Logstash user has. Now, we could use Redis, we could use whatever DB you like, but since ES is clustered this would fit nicely in with a future Logstash-cluster story.
Now, you may be quietly yelling to yourself about ES not being a message queue. This is true. But it's pretty damn fast at being a message queue given the following setup:
binary
Marshal.dump([event1, event2,...])
. This is even faster than the bulk API!Doing this I was able to get to ~ 55% of native speed. And this is on code that I've only played around with a short bit, more optimizations should be possible (esp. rewriting it in Java, which @colinsurprenant already demonstrated as worthwhile).
This all opens up the door to an interesting clustering opportunity. In fact, using ES in this way demands it. We must have some way of recovering these journals with only the state given in ES. There are multiple ways to do this, but the simplest way I can think of is to have each worker periodically send a heartbeat to ES, updating a doc saying "I'm still working on this shard".
Clock skew always is a problem in distributed systems, but we don't need accurate clocks here. As long as each node has a working System.nanoTime (which is monotonic, no clock resets affect it), it can watch all the other nodes' heartbeats, and if one hasn't changed in enough time, it can participate in a leader election via the ES document versions feature to take on the task of replaying it.
There's also room in here for shared config management, but that's for another day of course.
Currently we do batching internally (like the ES output). We could add a new 'ack' API into the outputs that lets them continue to perform their own internal buffering, and ACK when complete. Alternatively, we could build this batching functionality directly into logstash (which does a better job of protecting us from potentially broken plugins). I prefer the output batching method. Batching saves us from bad plugin bug reports when an author never calls ack.
A key improvement that could be made would be thread-safe concurrent outputs. To make this work correctly with the non-concurrent outputs today, each thread would need to flush the outputs before ACKing to ensure writes happened. I have not yet tested this, but I doubt there will be a significant perf impact here, esp. after we tune the worker buffer and latency values.
It makes logical sense, considering these competing priorities, to define multiple pipelines per config, one KakfaPipeline
one RegularPipeline
(regular being the syslog style). This would entail nothing more than adding a top-level pipeline
concept in the config, that could take its own options. A bonus here would be letting users group filters with pipelines vs. using if
blocks to figure out which filters get executed for which inputs. This would allow a user to setup a kafka pipeline, for instance, that would have a very different internal implementation than a syslog
pipeline. Additionally, making this pluggable would allow for rich experimentation.
Taking this even further (maybe in the distant future?), we could add the ability to have pipelines feed pipelines, essentially making a network of pipelines as a DAG, with transactions carrying through end-to-end. This would give users quite a bit of power in composing filters without having to use if
blocks.
Another new possibility this strategy of end-to-end ACKs opens up is threadsafe multi-event filters/codecs (like multi-line). This could work by essentially having the filters implement a sort of ACK barrier via a special message type. In other words, a multi-line filter could force the system to only ACK. in discrete multi-line chunks. If the system were to stop then restart, the journal would replay from the last successful multiline. Here be dragons, but it could prove useful for the future.
This would require having the top level pipeline take a 'partition' argument, that specified which fields identified a unique log source. It could use sprintf, and use something like '%{host}/%{service}', to identify apache log lines on a given service. The tricky thing here is that the pipeline would need to create a journal per 'partition'.
There's a lot of ideas here, but the key things I think we should implement are:
@suyograo mentioned the issue of side-effects during replay in filters being undesirable. Where filters perform external IO this can be a huge problem when things get replayed.
Currently LogStash is at-most-once
meaning you'll never see duplicate effects. This pipeline can be made to act similarly by moving the acknowledgements to happen before execution. This can result in lost messages however.
I think a more robust solution is to preserve at-least-once
semantics and to provide each message with a unique ID (which is already required by most persistence solutions), perhaps "WorkerUUID-sequenceId", which will be required by most database storage options anyway. This would allow those plugins to perform idempotent operations.
It looks like the RabbitMQ java client supports batch ACKs, though it isn't well documented. If this proves to be workable that could mean that RabbitMQ inputs wouldn't need need managed persistence, just like Kafka.
@andrewvc nice write up!
This would entail nothing more than adding a top-level pipeline concept in the config, that could take its own options. A bonus here would be letting users group filters with pipelines vs. using if blocks to figure out which filters get executed for which inputs.
Do we need to expose users to the implementation details? IMO, this should be transparent -- if we are using a kafka-esque :) input, we use the native acking..
@suyograo @andrewvc +1 on only persisting on input. Assuming filters are stateless, replaying on failure is not an issue and will reduce the resources needed for further persisting between filters and outputs. Realistically, failures will be few and far between :) so I don't think we should worry about the extra cpu time to redo the few computations that were happening in-memory.
@suyograo thanks! was thinking we could make the pipeline
concept optional. In other words, all configs would be implicitly wrapped in a pipeline
statement to make it backward compatible. If you want to use multiple pipelines it'd look like:
pipeline {
name => "mykafka_pipeline"
type => 'kafka'
zk_connect => ...
kafka_url => ...
input {
consumer_group { name => "foo"}
}
filter { ... }
output { ... }
}
pipeline {
name => "Some other pipeline"
...
}
The main reason is that for a Kafka pipeline you couldn't use, say a file
input, it wouldn't make sense. Kafka would be the pipeline, not the input .I assume the input blocks would contain some new input type, probably a consumer_group
input that only works within the Kafka pipeline. A RabbitMQ pipeline would work similarly. An added benefit here is this may be a nicer way to organize config files without if conditions since the filters and outputs would be scoped per-pipeline. Now, a downside here is that it would require repeating some config options, say your Elasticsearch credentials. I do think, however, that we could come up with some sort of shared definition there to fix that.
Regarding replaying filters which @suyograo, I think the performance implications are a valuable thing to consider, and I think there's a good story here. One idea is to just make the buffer size dynamically calculated based on the actual system throughput. The system could calibrate itself to have at most 2 seconds of data in buffers based on throughput. A larger issue here, however, is that end-to-end acknowledgements become much trickier if we insert a second persistence stage. In the case of Kafka or RabbitMQ that means having to configure secondary storage which negates a lot of the benefit there.
I think we should keep in mind that most filters are fast, and users will be able to tune this parameter even if we don't release a v1 with no auto-calibrating buffers. If they only want to keep 10 items max live in the pipeline they can do that and accept the performance cost. If they truly have filters that are extremely slow they won't benefit from having large buffers anyway, and more backpressure will be a good thing for those users.
Regarding filter idempotence I had a further thought. We can add a special field on replayed items, so that filters / outputs can handle replayed items differently if they want.
I would like to introduce the idea of a LMAX Disruptor based Queue. This is a lock free ring buffer with one publisher inserting events ahead of the consumers.
Consumers handle events sequentially. For example: [][][P][C1][e5][e4][e3][C2][e1][][]
, C2 can only catch up to C1 and C1 can only catch up to P and P is inserting e7, C1 is handling e6 and C2 is handling e2.
I imagine that C1 will be the MMAP persistor and C2 is the downstream handler (filter or output). Later we can easily add a metrics handler after C2 that can process metrics and dump them for analysis. This threaded metrics handler (consumer) will not affect performance. Other such consumers can be added at will.
Further, I can see just one buffer for the whole pipeline.
For example: [input][persistor-i][filter][persistor-u][output][persistor-d][metrics]
where:
Thoughts?
@guyboertje my understanding (which could be wrong, I like to learn more!) of Disruptor is that its performance comes from three features: First, that all objects are of the same fixed size (for cursors that can jump around quickly). Second, that all objects are stored whole within the queue and do not link/refer to memory outside of the queue (for cpu cache-friendliness). Third, that the queue must be large, like gigabytes (possibly for filesystem cache friendliness and less contention between publishers/consumers?).
Assuming my understandings are accurate, which again, they may not be -- For the first (same fixed size), we cannot today achieve this with LogStash::Event because our objects are so fluid in schema. For the second, because our events are unpredictable in schema and size, we cannot respect the first point (fixed size) and still respect the second point (store wholely in the queue). For the third, we could do this, but I'm not sure we want to force large queue spaces by default.
My comment thus far is based on my memory from reading the Disruptor paper many years ago, so my memory as well as my understanding of Disruptor is probably inaccurate - let me know ;P
That said, I am totally open to us exploring Disruptor as our internal queue mechanism.
@jordansissel - I think in general you are correct. However, given the slow (treacle) speeds of our pipeline elements compared to trading systems we would be leveraging the queue semantics instead of the raw speed. I like the idea of being able to insert other consumers at will, ones that don't necessarily come from the config.
@guyboertje ahh, excellent point! +1 on working towards the semantics of Disruptor. Agreed that Logstash probably won't be having microsecond-long transaction requirements like high-speed trading might ;P
I like the train-line like metaphor of trains waiting outside a station for the previous one to leave. I don't think we need millions of buffer entries, maybe 1024.
@guyboertje interesting idea with LMAX! I'm a big fan of the work the LMAX guys have done. Have you thought about how the LMAX architecture would deal with high-latency transactions? Logstash filters can be quite slow and even perform blocking IO in some cases.
Additionally, have you thought about how this fits in with multiple outputs and output batching? Higher up in the thread I discuss the importance of ACKing in relation to batching and replay. As far as I understand it LMAX is oriented toward single event processing, not micro-batching.
Lastly, what's the parallelism story here? If I start logstash with -w 4
will that start 4 disruptors I take it?
@guyboertje Additionally, what are your thoughts on pipeline rehydration vs. multi-stage persistence . So far in the thread the discussion seems to be leaning toward rehydration since replaying unacked messages is essentially free. Why incur the overhead of 2x persistence?
Lastly, one concern I have with lmax is that it's really geared toward local storage only with hot mirrors, there's a complex clustering story there. I think people may really like using remote storage (such as elasticsearch) to simplify administration vs. the complex failover scenario LMAX disruptors have wrt ops.
I have my concerns as to how well it fits with a clustering story.
@andrewvc - the actual Disruptor code may not be suitable. I am suggesting we consider the pros and cons of its architecture. There may be other implementations that are operationally similar without the 'mechanical sympathy' angle.
I would like the ability to add a metrics consumer after the output has completed. I have a dream (for performance and regression verification) that one day we will be able to run various configs through an instrumented pipeline that reports state and metrics and be able to compare the actual reported metrics against an expected set perhaps storing the config, input data and expectations in ES.
Just on the micro second transaction comment, we should take this into account anyway.
With the rise of networking datasets being pushed through LS (and probably Beats) I think it is definitely applicable. Plus I have seen logging use cases where nanosecond res was required.
@markwalkom +1 on that, though I think we can discuss microsecond/nanosecond time resolution different from microsecond/nanosecond processing latency through Logstash. I'd certainly like to move towards nanosecond processing latency ;)
Fair point :)
At all hands in Barcelona, I mentioned to @colinsurprenant - about Google Flatbuffers and how Facebook used them. In a nutshell the Flatbuffer is an immutable representation of a Hash like structure where field access is traversable without full decode. Facebook stored mutations to the original as a Flatbuffer too.
A mutation entry specifies the change that one filter made to the original or another mutation. Serialisation of the final event in an output would mean working backwards through the mutations and original entry building the final JSON string. Mutations would be persisted too but not necessarily in the same file as the original.
I'm not suggesting that we use Flatbuffers directly, but that this idea allows for skipping replay of filters that have been processed already. However, we must be mindful of dynamic config changes. If a filter definition was removed, modified or added after some mutations were recorded we would need to delete the mutation (and cascade if other mutations were dependent) applicable to that filter and replay.
WDYT?
@guyboertje I'm a huge fan of anything we can do to move toward immutable datastructures (we'll have to see about performance though, as always). This seems like it'd a great avenue to explore after @colinsurprenant gets a bit farther with the java event stuff maybe?
WRT dynamic config changes I don't think we have any plans to do those 'live', we'd definitely stop the pipeline then restart it, which would obviate the issues you mention.
@andrewvc - regarding pipeline restarts, its extent is not yet in stone. I expect that the inputs would be stopped while the backlog clears through before a restart is initiated, and this co-ordinated across a cluster.
@guyboertje You're right that it's not set in stone, though for our cluster config reload design we will be stopping and starting the pipeline for simplicity.
That being said, a small amount of downtime when reloading configs is probably a negligible cost for users that removes quite a large amount of complexity. Additionally, it will probably be easier to minimize downtime and load by simply building in a rolling restart at some point.
/cc @jsvd
@andrewvc @suyograo I think we can close this issue and reopen new issue(s) in the context of the new pipeline and java event. thoughts?
This is already in 2.2 mostly, will follow up in a new issue if need be.
This discusses next steps in persistence, continuing from #2609 .
Summing up a conversation @colinsurprenant and @suyograo had this morning.
Colin's work on Jruby MMAP Queues is going well. I took the time to look at some other queues out there, including deep dives in both Tape and Chronicle. Both have their issues, and I fully agree with @colinsurprenant 's approach with the custom queues.
I performed my own work on pipeline level refactors to deal with latency and input semantics related to persistence. It was agreed during the meeting that we should introduce pluggable pipelines to test out new concepts and experiment with them. Earlier, pluggable queues had been discussed, but after looking into it the Queue tends to be tightly bound to the pipeline, so that is the logical place to make a pluggable abstraction.
I made a number of notes myself on a new pipeline prototype that focusses on reducing latency while improving throughput. I will provide notes on that below in a comment once I have written out all my thoughts.