fluent / fluentd

Fluentd: Unified Logging Layer (project under CNCF)
https://www.fluentd.org
Apache License 2.0
12.7k stars 1.33k forks source link

Broken hadoop_snappy compression in some cases #4470

Open alheio opened 2 months ago

alheio commented 2 months ago

Describe the bug

Hello!

I have logs from two apps.

app1 - input in_tail one file, webhdfs output with file buffer and hadoop_snappy compression app2 - input in_tail files by mask, webhdfs output with file buffer and hadoop_snappy compression

app1 is watching on 1 file, buffer stores chunks like 20mb per 2 minutes and flush chunk into hdfs by time app2 is watching on 50 files, buffer stores like 50mb per 5 minutes and flush data into hdfs by time

App1 works fine, but in case of app2 Im getting "invalid compression" on like 5% of files (chunks) from different hosts while processing files.

Exception in thread "main" java.lang.InternalError: Could not decompress data. Input is invalid.
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompressBytesDirect(Native Method)
        at org.apache.hadoop.io.compress.snappy.SnappyDecompressor.decompress(SnappyDecompressor.java:239)
        at org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:88)
        at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
        at java.io.InputStream.read(InputStream.java:101)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:93)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:61)
        at org.apache.hadoop.io.IOUtils.copyBytes(IOUtils.java:121)
        at org.apache.hadoop.fs.shell.Display$Cat.printToStdout(Display.java:106)
        at org.apache.hadoop.fs.shell.Display$Cat.processPath(Display.java:101)
        at org.apache.hadoop.fs.shell.Command.processPaths(Command.java:317)
        at org.apache.hadoop.fs.shell.Command.processPathArgument(Command.java:289)
        at org.apache.hadoop.fs.shell.Command.processArgument(Command.java:271)
        at org.apache.hadoop.fs.shell.Command.processArguments(Command.java:255)
        at org.apache.hadoop.fs.shell.FsCommand.processRawArguments(FsCommand.java:118)
        at org.apache.hadoop.fs.shell.Command.run(Command.java:165)
        at org.apache.hadoop.fs.FsShell.run(FsShell.java:315)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
        at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
        at org.apache.hadoop.fs.FsShell.main(FsShell.java:372)

How can i tune config? Why hadoop_snappy generates invalid blocks? If i set compression "text" instead of "hadoop_snappy", all works fine, and no invalid records while parsing data in hdfs.

To Reproduce

To reproduce the issue, you could do what i did in "describe bug" section with fillowing dockerfile:

FROM fluent/fluentd:v1.16.5-debian-amd64-1.0

USER root

# Timezone
ENV TZ="Europe/Moscow"
RUN ln -snf "/usr/share/zoneinfo/$TZ" "/etc/localtime" \
    && echo "$TZ" > "/etc/timezone"

# for snappy gem native libs building
RUN apt update \
    && apt -y install build-essential autoconf automake libtool libsnappy-dev \
    && apt clean

# plugins
RUN fluent-gem install \
    fluent-plugin-webhdfs \
    fluent-plugin-prometheus \
    snappy

USER fluent

Expected behavior

Expected behaviour - valid compression on result files.

Your Environment

- Official fluentd docker image fluent/fluentd:v1.16.5-debian-amd64-1.0
- Fluentd version: 1.16.5
- gem 'fluent-plugin-prometheus' version '2.1.0'
- gem 'fluent-plugin-webhdfs' version '1.6.0'

Your Configuration

<system>
    workers 2
  </system>

  <source>
    @type monitor_agent
    bind "0.0.0.0"
    port 24220
  </source>

  <source>
    @type prometheus
    bind "0.0.0.0"
    port 24231
    metrics_path "/metrics"
  </source>

  <filter mytags.*>
    @type prometheus
    <metric>
      name fluentd_input_status_num_records_total
      type counter
      desc The total number of incoming records
      <labels>
        tag ${tag}
        hostname ${hostname}
      </labels>
    </metric>
  </filter>

  <source>
    @type prometheus_monitor
  </source>

  <source>
    @type prometheus_output_monitor
    interval 10
    <labels>
      hostname ${hostname}
    </labels>
  </source>

  <worker 0>
    <source>
      tag "mytags.app-tag-1"
      @type tail
      path "/path/to/logs/app1.log"
      pos_file "/home/fluentd/pipeline_workdir/app1.log.pos"
      read_from_head false
      <parse>
        @type "none"
        unmatched_lines
      </parse>
    </source>
    <match mytags.app-tag-1>
      @type webhdfs
      namenode namenode1:50070
      standby_namenode namenode2:50070
      ignore_start_check_error true
      retry_known_errors yes
      retry_times 10000
      retry_interval 30
      open_timeout 180
      read_timeout 180
      append no
      username hdfsuser
      path "/path/in/hdfs/app1/%Y/%m/%d/%H/#{Socket.gethostname}.${chunk_id}.log"
      compress hadoop_snappy
      <format>
        @type "single_value"
      </format>
      <buffer time>
        @type "file"
        path "/home/fluentd/pipeline_workdir/app1.log.buf"
        chunk_limit_size 512MB
        flush_mode interval
        flush_interval 2m
        flush_thread_count 5
        retry_type periodic
        retry_wait 30s
        timekey_zone "+0300"
        timekey 3600
      </buffer>
    </match>
  </worker>

  <worker 1>
    <source>
      tag "mytags.app2"
      @type tail
      path "/path2/to/logs/app2*.log"
      pos_file "/home/fluentd/pipeline_workdir/app2.log.pos"
      read_from_head false
      follow_inodes true
      rotate_wait 60
      <parse>
        @type "none"
        unmatched_lines
      </parse>
    </source>
    <match mytags.app2>
      @type webhdfs
      namenode namenode1:50070
      standby_namenode namenode2:50070
      ignore_start_check_error true
      retry_known_errors yes
      retry_times 10000
      retry_interval 30
      open_timeout 180
      read_timeout 180
      append no
      username hdfsuser
      path "/path/in/hdfs/app2/%Y/%m/%d/%H/#{Socket.gethostname}.${chunk_id}.log"
      compress hadoop_snappy
      <format>
        @type "single_value"
      </format>
      <buffer time>
        @type "file"
        path "/home/fluentd/pipeline_workdir/app2.log.buf"
        chunk_limit_size 512MB
        flush_mode interval
        flush_interval 2m
        flush_thread_count 5
        retry_type periodic
        retry_wait 30s
        timekey_zone "+0300"
        timekey 3600
      </buffer>
    </match>
  </worker>

Your Error Log

no errors on fluentd side

Additional context

The only difference between two pipelines is in_tail watching on 50 files by mask.

alheio commented 2 months ago

If i place both pipelines on one worker, im getting "invalid snappy compression" errors on both pipelines (only on the first pipeline, if second pipeline has "text" compression codec)