fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.93k stars 1.34k forks source link

in_unix: cause "[error]: #0 unexpected error in json payload error" or "[warn]: #0 incoming data is broken" with huge data #4692

Open Watson1978 opened 3 weeks ago

Watson1978 commented 3 weeks ago

Describe the bug

When it send the very long data at once from the client, it causes error with JSON data or warning with MessagePack format. In my environment, this always happens when I send more than 256 KB at once.

To Reproduce

It just sends data using following code

# frozen_string_literal: true
require "socket"
require "json"
require "msgpack"

FORMAT = :json
UNIX_SOCKET_PATH = "/tmp/fluentd-unix.sock"

DATA_LENGTH = 1024 * 256

def data_generater
  d = ['test', Time.now.to_i, {"length_#{DATA_LENGTH}": "a" * DATA_LENGTH}]
  case FORMAT
  when :json
    d.to_json
  when :msgpack
    MessagePack.pack(d)
  else
    raise "unknown format: #{FORMAT}"
  end
end

begin
  s = UNIXSocket.new(UNIX_SOCKET_PATH)
  loop do
    data =
    s.send(data_generater, 0)
    sleep 1
  end
rescue Errno::EPIPE => e
  p e
ensure
  s&.close
end

client side

ruby unix_client.rb

Fluentd side

bundle exec bin/fluentd -c ~/socket_unix.conf --workers 1

Expected behavior

No errors

Your Environment

- Fluentd version: git master
- Package version:
- Operating system:
- Kernel version:

Your Configuration

<source>
  @type unix
  path /tmp/fluentd-unix.sock
</source>

<match **>
  @type file
  path /tmp/fluentd.log
</match>

Your Error Log

JSON

Send JSON data from client

$ bundle exec bin/fluentd -c ~/socket_unix.conf --workers 1
2024-11-02 09:44:04 +0900 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:44:04 +0900 [info]: parsing config file is succeeded path="/home/watson/socket_unix.conf"
2024-11-02 09:44:04 +0900 [info]: gem 'fluentd' version '1.17.1'
2024-11-02 09:44:04 +0900 [warn]: define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:44:04 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type unix
    path "/tmp/fluentd-unix.sock"
  </source>
  <match **>
    @type file
    path "/tmp/fluentd.log"
    <buffer time>
      path "/tmp/fluentd.log"
    </buffer>
  </match>
</ROOT>
2024-11-02 09:44:04 +0900 [info]: starting fluentd-1.17.1 pid=16001 ruby="3.3.5"
2024-11-02 09:44:04 +0900 [info]: spawn command to main:  cmdline=["/home/watson/.rbenv/versions/3.3.5/bin/ruby", "-r/home/watson/.rbenv/versions/3.3.5/lib/ruby/site_ruby/3.3.0/bundler/setup", "-Eascii-8bit:ascii-8bit", "bin/fluentd", "-c", "/home/watson/socket_unix.conf", "--workers", "1", "--under-supervisor"]
2024-11-02 09:44:04 +0900 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:44:04 +0900 [info]: adding match pattern="**" type="file"
2024-11-02 09:44:04 +0900 [info]: adding source type="unix"
2024-11-02 09:44:04 +0900 [warn]: #0 define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:44:04 +0900 [info]: #0 starting fluentd worker pid=16025 ppid=16001 worker=0
2024-11-02 09:44:04 +0900 [warn]: #0 Found existing '/tmp/fluentd-unix.sock'. Remove this file for in_unix plugin
2024-11-02 09:44:04 +0900 [info]: #0 listening fluent socket on /tmp/fluentd-unix.sock
2024-11-02 09:44:04 +0900 [info]: #0 fluentd worker is now running worker=0
2024-11-02 09:44:07 +0900 [error]: #0 unexpected error in json payload error="lexical error: invalid string in json text.\n                                     [\"test\",1730508247,{\"length_26214\n                     (right here) ------^\n"
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin/in_unix.rb:175:in `<<'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin/in_unix.rb:175:in `on_read_json'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/io.rb:123:in `on_readable'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/io.rb:186:in `on_readable'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/loop.rb:88:in `run_once'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/.rbenv/versions/3.3.5/lib/ruby/gems/3.3.0/gems/cool.io-1.9.0/lib/cool.io/loop.rb:88:in `run'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin_helper/event_loop.rb:93:in `block in start'
  2024-11-02 09:44:07 +0900 [error]: #0 /home/watson/src/fluentd/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

MessagePack

Send MessagePack data from client

$ bundle exec bin/fluentd -c ~/socket_unix.conf --workers 1
2024-11-02 09:45:01 +0900 [info]: init supervisor logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:45:01 +0900 [info]: parsing config file is succeeded path="/home/watson/socket_unix.conf"
2024-11-02 09:45:01 +0900 [info]: gem 'fluentd' version '1.17.1'
2024-11-02 09:45:01 +0900 [warn]: define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:45:01 +0900 [info]: using configuration file: <ROOT>
  <source>
    @type unix
    path "/tmp/fluentd-unix.sock"
  </source>
  <match **>
    @type file
    path "/tmp/fluentd.log"
    <buffer time>
      path "/tmp/fluentd.log"
    </buffer>
  </match>
</ROOT>
2024-11-02 09:45:01 +0900 [info]: starting fluentd-1.17.1 pid=16161 ruby="3.3.5"
2024-11-02 09:45:01 +0900 [info]: spawn command to main:  cmdline=["/home/watson/.rbenv/versions/3.3.5/bin/ruby", "-r/home/watson/.rbenv/versions/3.3.5/lib/ruby/site_ruby/3.3.0/bundler/setup", "-Eascii-8bit:ascii-8bit", "bin/fluentd", "-c", "/home/watson/socket_unix.conf", "--workers", "1", "--under-supervisor"]
2024-11-02 09:45:01 +0900 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2024-11-02 09:45:01 +0900 [info]: adding match pattern="**" type="file"
2024-11-02 09:45:01 +0900 [info]: adding source type="unix"
2024-11-02 09:45:01 +0900 [warn]: #0 define <match fluent.**> to capture fluentd logs in top level is deprecated. Use <label @FLUENT_LOG> instead
2024-11-02 09:45:01 +0900 [info]: #0 starting fluentd worker pid=16185 ppid=16161 worker=0
2024-11-02 09:45:01 +0900 [warn]: #0 Found existing '/tmp/fluentd-unix.sock'. Remove this file for in_unix plugin
2024-11-02 09:45:01 +0900 [info]: #0 listening fluent socket on /tmp/fluentd-unix.sock
2024-11-02 09:45:01 +0900 [info]: #0 fluentd worker is now running worker=0
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
2024-11-02 09:45:04 +0900 [warn]: #0 incoming data is broken: msg=97
--- (snip) ---

Additional context

No response

Watson1978 commented 3 weeks ago

The data received by the cool.io gem is delimited to 16384 bytes or less and passed to the application. However, Fluentd might not handle it properly, I think.

require 'bundler/inline'
gemfile do
  source 'https://rubygems.org'
  gem 'cool.io'
end

require "socket"
require "json"
require 'fileutils'

UNIX_SOCKET_PATH = "/tmp/fluentd-unix.sock"
DATA_LENGTH = 1024 * 256

FileUtils.rm_f(UNIX_SOCKET_PATH)

class Client
  def data_generater
    d = ['test', Time.now.to_i, {"length_#{DATA_LENGTH}": "a" * DATA_LENGTH}]
    d.to_json
  end

  def run
    Thread.new do
      s = UNIXSocket.new(UNIX_SOCKET_PATH)
      sleep 0.5
      data = data_generater
      s.send(data, 0)
      puts "[client] sent size = #{data.size}"
      s.close

      puts "[client] Finished!!"
    end
  end
end

class ServerConnection < Cool.io::UNIXSocket
  def on_connect
    puts "[server] connected"
  end

  def on_close
    puts "[server] disconnected"

    exit
  end

  def on_read(data)
    puts "[server] received size = #{data.size}"
  end
end

server = Cool.io::UNIXServer.new(UNIX_SOCKET_PATH, ServerConnection)
server.attach(Cool.io::Loop.default)

Client.new.run

Cool.io::Loop.default.run
$ ruby coolio.rb
/home/watson/.rbenv/versions/3.3.5/lib/ruby/3.3.0/json/common.rb:3: warning: ostruct was loaded from the standard library, but will no longer be part of the default gems starting from Ruby 3.5.0.
You can add ostruct to your Gemfile or gemspec to silence this warning.
[server] connected
[client] sent size = 262184
[server] received size = 16384
[client] Finished!!
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 16384
[server] received size = 6272
[server] disconnected
Watson1978 commented 3 weeks ago

I guess the other plugins using cool.io gem have also same issue.