Open andrewvc opened 9 years ago
I agree the callback mechanism makes it awkward to test.
Given the current state of things there are some tricky compatibility issues here. I initially hoped that there might be some way to create an implementation of #encode_sync
in the Codecs
base class, but looking around that doesn't look doable in a non-hacky way that wouldn't compromise performance.
I think the best path forward would be to just start adding this method on a plugin by plugin basis, instead of putting it formally in the core at the moment. There aren't that many Codec plugins so it is doable.
@jordansissel @ph What are your thoughts?
@andrewvc I agree with you we should do it plugin by plugin basis and keep the old method for backward compatibility. Also as soon as we update the plugin to use the sync method we need to make sure to update their dependencies to a specific version of logstash-core
.
But by forcing a plugin to use a recent version of logstash-core
we will prevent people running an older 1.5.X to use the latest version of the codec.
So maybe we should target this change as 2.0?
@ph agreed on 2.0. Just tagged it as such.
@jordansissel @ph if it's a 2.0 target should we just target altering #encode to be sync exclusively? Are any codecs actually dependent on the async behavior?
The only case where I can think of this is something using EventMachine or similar to do some external IO, which should probably be in a filter.
@andrewvc Well if we target 2.0, its a good time to just change the behavior and drop backward support.
Also 2.0 is a good time to see if we could benefit to redefine the codec to be more Streamy
, I am thinking about compressed files here that could be decoded as a stream.
We need to revive this issue in light of the new NG pipeline. If codec encoding has an async contract we can't guarantee backpressure, and can't guarantee that we've persisted disk safely.
The compress-spooler codec is particularly problematic. It is the only codec we have ( I checked the source of all of them), that has its own internal buffer ,and does truly async encoding.There is no way to make this work with persistence. We need backpressure, and we need to do it through the callstack.
@guyboertje 's proposal https://github.com/elastic/logstash/issues/4432 for inputs has been mentioned as a possible approach here, but I'm not sure it's a workable approach for outputs/codecs because we need backpressure to always be synchronous. I'd love your feedback here @guyboertje. AFAICT we should just chain function calls because asynchronous contexts and backpressure are not going to be used here.
encode
synchronousProposed interface
class HypotheticalJsonCodec < LogStash::Codec
# For a hypothetical JSON encoder
def encode(event)
encoded = event.to_json
@on_event.call(event, encoded) if @on_event # Backwards compatibility
[event, encoded]
end
# This is important for codecs like compress spooler.
# This will let them use the external pipeline buffer
def multi_encode(events)
events.map(&:encode)
end
end
class HypotheticalHTTPOutput < LogStash::Output
def multi_receive(encoded_events)
encoded_events.each do |event, encoded|
send_request(url: event.sprintf(event), body: encoded)
end
end
end
@andrewvc - with components the call chain is synchronous as it the return unwind so we just need to pass a flag back on the return unwind.
@guyboertje can you post a code sample of what that would look like for an encode from an output plugin author standpoint?
@andrewvc - sure, if you first post a sample output that wishes to communicate backpressure upstream :smile: I will show how components can facilitate this backpressure communication.
@guyboertje can you use the hypothetical ones I listed above? All outputs should now exhibit backpressure since they all by definition perform IO.
Ahh - then synchronous blocking back-pressure it is inherent in the call chain.
Now the discussion turns to the when and how the batch elements are handled aka de-batching. Either an output can natively write bulk or it can't. If an output can't write bulk then each event + encoded data needs to be sent to the output, i.e. de-batched. We can either de-batch at the encode component or the output component. In either case we have blocking back-pressure.
De-batch at the encode component:
class DebatchingEncoderComponent
include Component
# the pipeline will dequeue a batch and call
# accept here with a context and data (batch)
def add_codec(codec)
@codec = codec
@codec.on_event do |event, encoded|
# de-batch here
# if downstream blocks then this will too
deliver(event, encoded)
end
self
end
# will we need a context?
def do_work(context, data)
#here data is a batch of events
data.each do |event|
# if on_event blocks then this call will too
# if I block here then the pipeline dequeuing
# call to my accept method will block too
@codec.encode(event)
end
end
end
class OutputComponent
include Component
def add_output(output)
@output = output
self
end
def accept(event, data)
@output.receive(event, data)
end
end
De-batch at the output component:
class BulkEncoderComponent
include Component
# the pipeline will dequeue a batch and call
# accept here with a context and data (batch)
# if downstream blocks then accept blocks
# and the dequeue loop blocks
def add_codec(codec)
@codec = codec
@codec.on_event do |event, encoded|
# this call does not block
assemble(event, encoded)
end
self
end
# will we need a context?
def do_work(context, data)
@assembled = []
#here data is a batch of events
data.each do |event|
@codec.encode(event)
end
# if downstream blocks then this blocks too
deliver(context, @assembled)
end
def assemble(event, encoded)
@assembled.push([event, encoded])
end
end
class DebatchingOutputComponent
include Component
def add_output(output)
@output = output
self
end
def accept(context, data)
# here data is an array of [event, encoded]
# if this call blocks then upstream deliver will block
data.each do |event, encoded|
@output.receive(event, encoded)
end
end
end
Output that natively handles bulk write:
class BulkEncoderComponent
include Component
# the pipeline will dequeue a batch and call
# accept here with a context and data (batch)
# if downstream blocks then accept blocks
# and the dequeue loop blocks
def add_codec(codec)
@codec = codec
@codec.on_event do |event, encoded|
# this call does not block
assemble(event, encoded)
end
self
end
# will we need a context?
def do_work(context, data)
@assembled = []
#here data is a batch of events
data.each do |event|
@codec.encode(event)
end
# if downstream blocks then this blocks too
deliver(context, @assembled)
end
def assemble(event, encoded)
@assembled.push([event, encoded])
end
end
class BulkOutputComponent
include Component
def add_output(output)
@output = output
self
end
def accept(context, data)
# here data is an array of [event, encoded]
# if this call blocks then upstream deliver will block
@output.multi_receive(context, data)
end
end
Using components will separate the concerns of encoding, output writing and batching handling.
Also, we can change the codec to use a listener callback instead of a block:
class BulkEncoderComponent
include Component
# modified codec to use a listener based callback
# codec will call process and not a block
def add_codec(codec)
@codec = codec.add_listener(self) # add_listener returns the codec (self)
self
end
# will we need a context?
def do_work(context, data)
@assembled = []
#here data is a batch of events
data.each do |event|
@codec.encode(event)
end
# if downstream blocks then this blocks too
deliver(context, @assembled)
end
def process(event, encoded)
@assembled.push([event, encoded])
end
end
I would think that the channel of components are assembled in the output and not in the pipeline because the output config is how the user communicates the desired output behaviour, the pipeline should not be concerned about this.
@guyboertje thanks for the verbose examples! . Now that I have a clearer idea of what we're doing I think your proposal is to some extent orthogonal to what I'm discussing (I think the components are good, but I'm actually talking about something different).
The main thing I'm interested in fixing here is the async nature of codecs. Additionally, I think we should add a synchronous multi_encode
interface to codecs (which would fix the issue with compress_spooler
needing to be async, it is the only buffering codec). This would also make the implementation of your BulkEncoderComponent
much more simple, not needing its @assembled
array.
WRT the pipeline handling the encoding: one advantage of the pipeline handling this would be that in the case where two outputs use the same codec the pipeline would only encode once. I would say that is a niche case however.
I have one final question about components in outputs, however, what new use cases would they enable?
@andrewvc - I can't see that the compress_spooler
is async. Sure, it buffers and, for normal operations, the eventual compressed data will be synchronously given to the on_data
callback during the encode call of event N where N >= @spool_size
. This is fine, every call to accept on a component does not have to result in a call to deliver. The only possible asynch bit is the flush on close because this is called by the main pipeline thread. But here's the thing, in the 2.2 snapshot, I can't find any outputs that call @codec.close
or @codec.flush
so the compress_spooler if used today in outputs will lose events. Further I can't see whether, if at all, flush is called on codecs in outputs periodically.
Compress spooler breaks the on_event contract by offering one arg to the block. The output receives (compressed, nil)
and will write nil out.
irb(main):001:0> l = ->(a,b) { puts a, b }
=> #<Proc:0x3122b117@(irb):1 (lambda)>
irb(main):002:0> l.call(3)
ArgumentError: wrong number of arguments (1 for 2)
from org/jruby/RubyProc.java:267:in `call'
from (irb):2:in `evaluate'
from org/jruby/RubyKernel.java:1079:in `eval'
from org/jruby/RubyKernel.java:1479:in `loop'
from org/jruby/RubyKernel.java:1242:in `catch'
from org/jruby/RubyKernel.java:1242:in `catch'
from /Users/guy/.rubies/jruby-1.7.22/bin/irb:13:in `(root)'
irb(main):003:0> def ll(&block) block.call(42); end
=> nil
irb(main):004:0> ll {|a,b| puts a, b }
42
=> nil
Hmmm.
The json codec:
def encode(event)
@on_event.call(event, event.to_json)
end # def encode
The compress_spooler codec:
def encode(event)
# use normalize to make sure returned Hash is pure Ruby for
# MessagePack#pack which relies on pure Ruby object recognition
@buffer << LogStash::Util.normalize(event.to_hash).merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)
# If necessary, we flush the buffer and get the data compressed
if @buffer.length >= @spool_size
@on_event.call(compress(@buffer, @compress_level))
@buffer.clear
end
end # def encode
def close
return if @buffer.empty?
@on_event.call(compress(@buffer, @compress_level))
@buffer.clear
end
private
def compress(data, level)
z = Zlib::Deflate.new(level)
result = z.deflate(MessagePack.pack(data), Zlib::FINISH)
z.close
result
end
In summary - in practise, today, we don't have working async codecs in use in outputs.
WRT the pipeline handling the encoding: one advantage of the pipeline handling this would be that in the case where two outputs use the same codec the pipeline would only encode once. I would say that is a niche case however.
In output_batch
a subset of the events via output_func is built per output, if and only if the two outputs have the same codec and the same subset then we can encode that subset once. The more complex the config the less likely they will see this speed boost when it is most needed.
I have one final question about components in outputs, however, what new use cases would they enable?
Acking back to the persistent queue maybe. e.g. tail component is AckingComponent. and comes after output component. Metrics link components.
Remember this is about separation of concerns let codecs encode and outputs write and components link/communicate.
@guyboertje WRT acking back to the persistent queue, my question is why does that need to be explicit. In my mind if we have a sychronous API we just need to make sure output.multi_receive(codec.encode(data))
returns and we can consider it ack. My question there is about why we need the extra ceremony of components there.
@andrewvc - What do you think of making the 'batch' that we pass around into an object that wraps the initial array? This Batch object can then hold some state and have behaviour.
Imagine a batch size of 10 and a spool_size of 15 and batches A, B, C and the CompressSpooler codec. Batch A is given to the codec and it buffers the batch object. The call returns and the batch state reports that 0 events were processed. Batch B is then given. All the buffered batches (Batch A) are merged with Batch B after 5 events are taken from Batch B and put in a new Batch D that is buffered. Batch B is compressed and the Batch B is given to the output. When the output has sent the compressed payload it marks Batch B done (15 events) the call stack unwinds all the way back and we ACK those 15 events. Now Batch C is given to the codec, 5 events in the buffered Batch D are merged with Batch C and it has 15 so the call to succeeds and Batch C reports the 15 events as processed. Batch D is discarded.
WRT acking back to the persistent queue
I understand the synchronous return success or fail as a mechanism to consider a batch acked. Regarding components I am thinking that a AckingComponent could synchronously call across to the PQ to actually ACK but may be as simple as hold the logic that 'finalises' the batch instance before the call unwind. At least if you do a synch ACK call to the PQ close to the point of transmission then if anything goes wrong in the return path (metrics, logging) the events are still acked. Also if you do a synch ACK call to PQ when and only when the transmission occurs, you don't have to worry about the Batch A returning early from the CodecComponent.
@guyboertje I think your design is good, but I believe it solves a different problem than what we have here.
The whole reason we need to spool is because the old pipeline processes events one at a time. With the new pipeline the compress spooler needn't spool, the batch size would == the spool size. The codec would just provide a multi_encode
method that compresses the whole batch at once (at that point the pipeline is doing the work of spooling for it).
Once we do that we can just keep things simple as in my example above with output.multi_receive(codec.multi_encode(events))
.
WRT errors in the return path we can guard against that with a begin/rescue/end
block.
@andrewvc - spool size, being a config setting and batch size a command line option are not guaranteed to be equal. Of course, we could force this. I am proposing solutions that are more generic in the context of channels where knowing when an event is 'done' is more deterministic.
OTOH, if we are going to modify outputs and codecs to be batch aware then using one Batch object vs one Event we can hide the multi-ness inside the objects.
output.receive(codec.encode(object))
where object is a Batch instance or Event instance
However, components means minimal patching to codecs and outputs.
@guyboertje my point is that there is no need for spooling if we have the batch, the spooling only exists because compressing a single event into a block is pointless. Compressing a batch should be fine. We should remove spool size as a configuration option. That one option in that one codec is the only reason we would need the complexity of components. By removing that we can just use simple function calls as I mentioned before and call it a day.
On Thursday, January 14, 2016, Guy Boertje notifications@github.com wrote:
@andrewvc https://github.com/andrewvc - spool size, being a config setting and batch size a command line option are not guaranteed to be equal. Of course, we could force this. I am proposing solutions that are more generic in the context of channels where knowing when an event is 'done' is more deterministic.
OTOH, if we are going to modify outputs and codecs to be batch aware then using one Batch object vs one Event we can hide the multi-ness inside the objects. output.receive(codec.encode(object)) where object is a Batch instance or Event instance
However, components means minimal patching to codecs and outputs.
— Reply to this email directly or view it on GitHub https://github.com/elastic/logstash/issues/3486#issuecomment-171627049.
Mocking out a codec for plugin testing is irritatingly hard due to the async API. While the async api is fine, I believe the synchronous logic should be wrapped in a blocking
#encode_sync
method. That would make tests like this:look like this:
This was inspired by the discussion over this PR on the SNS input: https://github.com/logstash-plugins/logstash-output-sns/pull/6/files#r32961911