fluent / fluent-plugin-flume

Flume input and output plugin for Fluentd
23 stars 13 forks source link

out_flume: Use latest thrift protocol #7

Closed cosmo0920 closed 8 years ago

cosmo0920 commented 8 years ago

I've tried to migrate latest flume thrift protocol.

Limitations

Latest flume protocol does not support sink from flume. So, we cannot implement in_flume with thrift protocol. (Also avro protocol does not support sink.)

How to test

Prepare

# thrift-single.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = thrift
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.channels = c1
a1.sources.r1.port = 4141

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
<source>
  type forward
  bind 0.0.0.0
  port 24224
</source>

<filter test.flume>
  type stdout
</filter>

<match **>
  type flume
  host localhost
  port 4141
</match>

Test

$ bin/flume-ng agent --conf conf --conf-file thrift-single.conf --name a1 -Dflume.root.logger=INFO,console
$ fluentd -c flume.conf -p lib/fluent/plugin # with this branch
$ echo '{"test":"flume!"}' | fluent-cat test.flume 

Result

2015-11-25 19:25:46,778 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ThriftSource.getProtocolFactory(ThriftSource.java:302)] Using TCompactProtocol
2015-11-25 19:25:47,784 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2015-11-25 19:25:47,784 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
2015-11-25 19:25:47,784 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.ThriftSource.start(ThriftSource.java:250)] Started Thrift source.
2015-11-25 19:25:56,644 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestamp=1448447156, tag=test.flume} body: 7B 22 74 65 73 74 22 3A 22 66 6C 75 6D 65 21 22 {"test":"flume!" }

Displayed {"test":"flume!" }, Yay!

repeatedly commented 8 years ago

Thanks for great patch!

cosmo0920 commented 8 years ago

Wow, thanks for the so quick response and merging!