fluent-plugins-nursery / fluent-plugin-concat

Fluentd Filter plugin to concatenate multiline log separated in multiple events.
MIT License
108 stars 33 forks source link

Plugin is concatinating multiple lines into single event #65

Open Akhisar opened 5 years ago

Akhisar commented 5 years ago

Problem

...Pluggin is concatinating multiple lines into single event

Steps to replicate

Config:

<source>
  @id fluentd-containers-flink.log
  @type tail
  path /var/log/containers/**flink**.log
  pos_file /var/log/fluentd-containers-flink.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag raw.kubernetes.*
  format json
  @label @CONCAT_FLINK
</source>

<label @CONCAT_FLINK>
  <filter **flink**>
    @type concat
    key log
    stream_identity_key container_id
    multiline_start_regexp /\d{4}-\d{1,2}-\d{1,2}/
    continuous_line_regexp /Caused\sby:|\s+at\s.*|^java|\s+ \.\.\. (\d)+ more/
    #multiline_end_regexp /\s+.*more$/
    flush_interval 10
    timeout_label @OUTPUT
  </filter>
  <filter **flink**>
    @type parser
    key_name log
    reserve_data true
    <parse>
      @type regexp
      expression /^(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s(?<log_level>\w+)\s+(?<log_logger>.*?\s)\s+-(?<log_message>.*)/m
    </parse>
  </filter>

  <match>
   @type relabel
   @label @OUTPUT
  </match>

Log line:

{"log":"2019-02-08 10:03:50,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35000] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:03:50.885661556Z"}
{"log":"2019-02-08 10:03:51,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:03:51.671149184Z"}
{"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:03:51.671201184Z"}
{"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:03:51.671218384Z"}
{"log":"2019-02-08 10:04:05,884 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35602] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:05.884977614Z"}
{"log":"2019-02-08 10:04:06,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:04:06.671248435Z"}
{"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:04:06.671281135Z"}
{"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:06.671293435Z"}
{"log":"2019-02-08 10:04:20,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:36216] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:20.886034327Z"}
{"log":"2019-02-08 10:04:21,671 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:04:21.672123047Z"}
{"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:04:21.672146847Z"}
{"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:21.672151847Z"}

Current O/P:

Currently the log keyword has both the entries:

2019-02-08 09:58:06,179 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35690] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded
2019-02-08 09:58:10,515 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.

java.io.IOException: Unknown operation 71

    at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)

Expected Behavior

Expect both line to be separated

Your environment

fluent-plugin-concat (2.3.0) fluent-plugin-detect-exceptions (0.0.11) fluent-plugin-elasticsearch (2.11.5) fluent-plugin-grok-parser (2.4.0) fluent-plugin-kubernetes_metadata_filter (2.0.0) fluent-plugin-multi-format-parser (1.0.0) fluent-plugin-prometheus (1.0.1) fluent-plugin-systemd (1.0.1)

Akhisar commented 5 years ago

pasting the config again

<source>
  @id fluentd-containers-flink.log
  @type tail
  path /var/log/containers/**flink**.log
  pos_file /var/log/fluentd-containers-flink.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag raw.kubernetes.*
  format json
  @label @CONCAT_FLINK
</source>

<label @CONCAT_FLINK>
  <filter **flink**>
    @type concat
    key log
    stream_identity_key container_id
    multiline_start_regexp /\d{4}-\d{1,2}-\d{1,2}/
    continuous_line_regexp /Caused\sby:|\s+at\s.*|^java|\s+ \.\.\. (\d)+ more/
    #multiline_end_regexp /\s+.*more$/
    flush_interval 10
    timeout_label @OUTPUT
  </filter>
  <filter **flink**>
    @type parser
    key_name log
    reserve_data true
    <parse>
      @type regexp
      expression /^(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s(?<log_level>\w+)\s+(?<log_logger>.*?\s)\s+-(?<log_message>.*)/m
    </parse>
  </filter>

  <match>
   @type relabel
   @label @OUTPUT
  </match>
</label>
okkez commented 5 years ago

What is your expected result? If you want to remove an extra newline \n, you can use the separator parameter.

I've tested with the following configuration:

<source>
  @type dummy
  dummy [
    {"log":"2019-02-08 10:03:50,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35000] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:03:50.885661556Z"},
    {"log":"2019-02-08 10:03:51,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:03:51.671149184Z"},
    {"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:03:51.671201184Z"},
    {"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:03:51.671218384Z"},
    {"log":"2019-02-08 10:04:05,884 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35602] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:05.884977614Z"},
    {"log":"2019-02-08 10:04:06,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:04:06.671248435Z"},
    {"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:04:06.671281135Z"},
    {"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:06.671293435Z"},
    {"log":"2019-02-08 10:04:20,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:36216] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:20.886034327Z"},
    {"log":"2019-02-08 10:04:21,671 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","time":"2019-02-08T10:04:21.672123047Z"},
    {"log":"java.io.IOException: Unknown operation 71\n","stream":"stdout","time":"2019-02-08T10:04:21.672146847Z"},
    {"log":"\u0009at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:21.672151847Z"}
  ]
  tag dummy
</source>

<filter dummy>
  @type concat
  key log
  stream_identity_key container_id
  multiline_start_regexp /\d{4}-\d{1,2}-\d{1,2}/
  continuous_line_regexp /Caused\sby:|\s+at\s.*|^java|\s+ \.\.\. (\d)+ more/
  #multiline_end_regexp /\s+.*more$/
  flush_interval 10
  timeout_label @OUTPUT
</filter>

<match dummy>
  @type stdout
</match>

The result:

2019-02-12 11:07:10.093425404 +0900 dummy: {"log":"2019-02-08 10:03:50,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35000] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:03:50.885661556Z"}
2019-02-12 11:07:13.098477408 +0900 dummy: {"log":"2019-02-08 10:03:51,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n\njava.io.IOException: Unknown operation 71\n\n\tat org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:03:51.671149184Z"}
2019-02-12 11:07:14.000064485 +0900 dummy: {"log":"2019-02-08 10:04:05,884 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:35602] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:05.884977614Z"}
2019-02-12 11:07:17.004081321 +0900 dummy: {"log":"2019-02-08 10:04:06,670 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n\njava.io.IOException: Unknown operation 71\n\n\tat org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:06.671248435Z"}
2019-02-12 11:07:18.005495485 +0900 dummy: {"log":"2019-02-08 10:04:20,885 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.21.0.106:36216] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n","stream":"stdout","time":"2019-02-08T10:04:20.886034327Z"}
2019-02-12 11:07:21.010161527 +0900 dummy: {"log":"2019-02-08 10:04:21,671 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n\njava.io.IOException: Unknown operation 71\n\n\tat org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","time":"2019-02-08T10:04:21.672123047Z"}
Akhisar commented 5 years ago

Hi,

Thanks for the reply.

My issue is that in Kibana both the log lines are concatenated together which is strange.

screen shot 2019-02-08 at 15 35 01

okkez commented 5 years ago

Hmm, it's strange... You can debug your configuration using out_stdout plugin and filter_stdout plugin like following:

<source>
  @id fluentd-containers-flink.log
  @type tail
  path /var/log/containers/**flink**.log
  pos_file /var/log/fluentd-containers-flink.log.pos
  time_format %Y-%m-%dT%H:%M:%S.%NZ
  tag raw.kubernetes.*
  format json
  @label @CONCAT_FLINK
</source>

<label @CONCAT_FLINK>
  <filter **flink**>
    @type concat
    key log
    stream_identity_key container_id
    multiline_start_regexp /\d{4}-\d{1,2}-\d{1,2}/
    continuous_line_regexp /Caused\sby:|\s+at\s.*|^java|\s+ \.\.\. (\d)+ more/
    #multiline_end_regexp /\s+.*more$/
    flush_interval 10
    timeout_label @OUTPUT
  </filter>
  <filter **flink**>
    @type stdout
  </filter>
  <filter **flink**>
    @type parser
    key_name log
    reserve_data true
    <parse>
      @type regexp
      expression /^(?<log_date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\s(?<log_level>\w+)\s+(?<log_logger>.*?\s)\s+-(?<log_message>.*)/m
    </parse>
  </filter>

  <match>
   @type relabel
   @label @OUTPUT
  </match>
</label>

<label @OUTPUT>
  <match>
    @type stdout
  </match>
</label>
Akhisar commented 5 years ago

I checked with stdout on output side.

With

  <filter **flink**>
          @type concat
          key log
          stream_identity_key container_id
          multiline_start_regexp /^\d{4}-\d{1,2}-\d{1,2} \d{2}:\d{2}:\d{2},\d{1,3} .*/
          multiline_end_regexp /\d{4}-\d{1,2}-\d{1,2} \d{2}:\d{2}:\d{2},\d{1,3}/
          flush_interval 10
          timeout_label @OUTPUT
   </filter>
2019-03-05 07:05:33.348407995 +0000 kubernetes.var.log.containers.scenario-service-flink-jobmanager-547fcc8f48-6j7sq_nightly_scenario-service-flink-jobmanager-7306a89cc3da3f55ae6c37c18228e658388f70168796311110459ec83b8fb4b6.log: {"log":"2019-03-05 07:05:33,144 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n","stream":"stdout","log_date":"2019-03-05 07:05:33,144","log_level":"ERROR","log_logger":"org.apache.flink.runtime.blob.BlobServerConnection ","log_message":" Error while executing BLOB connection.\n","docker":{"container_id":"7306a89cc3da3f55ae6c37c18228e658388f70168796311110459ec83b8fb4b6"},"kubernetes":{"container_name":"scenario-service-flink-jobmanager","namespace_name":"nightly","pod_name":"scenario-service-flink-jobmanager-547fcc8f48-6j7sq","pod_id":"e096ba68-3bab-11e9-b301-92e9bbf34b8a","labels":{"app":"scenario-service-flink-jobmanager","pod-template-hash":"1039774904","release":"nightly-scenario-service-flink-jobmanager"},"host":"aks-nightly-49443436-2","master_url":"https://nightly-4f2ceb09.hcp.westeurope.azmk8s.io:443/api","namespace_id":"ccf92e65-3bab-11e9-b301-92e9bbf34b8a"},"cluster":"KubernetesNightlyFRP"}
2019-03-05 07:05:38 +0000 [info]: #0 stats - namespace_cache_size: 1, pod_cache_size: 4, namespace_cache_api_updates: 4, pod_cache_api_updates: 4, id_cache_miss: 4
2019-03-05 07:05:33.145215330 +0000 kubernetes.var.log.containers.scenario-service-flink-jobmanager-547fcc8f48-6j7sq_nightly_scenario-service-flink-jobmanager-7306a89cc3da3f55ae6c37c18228e658388f70168796311110459ec83b8fb4b6.log: {"log":"java.io.IOException: Unknown operation 71\n\tat org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","docker":{"container_id":"7306a89cc3da3f55ae6c37c18228e658388f70168796311110459ec83b8fb4b6"},"kubernetes":{"container_name":"scenario-service-flink-jobmanager","namespace_name":"nightly","pod_name":"scenario-service-flink-jobmanager-547fcc8f48-6j7sq","pod_id":"e096ba68-3bab-11e9-b301-92e9bbf34b8a","labels":{"app":"scenario-service-flink-jobmanager","pod-template-hash":"1039774904","release":"nightly-scenario-service-flink-jobmanager"},"host":"aks-nightly-49443436-2","master_url":"https://nightly-4f2ceb09.hcp.westeurope.azmk8s.io:443/api","namespace_id":"ccf92e65-3bab-11e9-b301-92e9bbf34b8a"},"cluster":"KubernetesNightlyFRP"}

Here I get three log entries:

First: WARN akka.remote.transport.netty.NettyTransport - Remote connection to [/172.23.0.106:54728] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded

Second: ERROR org.apache.flink.runtime.blob.BlobServerConnection - Error while executing BLOB connection.

Third: java.io.IOException: Unknown operation 71 at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)

With

  <filter **flink**>
          @type concat
          key log
          stream_identity_key container_id
          multiline_start_regexp /^\d{4}-\d{1,2}-\d{1,2} \d{2}:\d{2}:\d{2},\d{1,3} .*/
          flush_interval 10
          timeout_label @OUTPUT
   </filter>
2019-03-05 06:59:47.428020218 +0000 kubernetes.var.log.containers.scenario-service-flink-jobmanager-547fcc8f48-6j7sq_nightly_scenario-service-flink-jobmanager-7306a89cc3da3f55ae6c37c18228e658388f70168796311110459ec83b8fb4b6.log: {"log":"2019-03-05 06:59:37,701 WARN  akka.remote.transport.netty.NettyTransport                    - Remote connection to [/172.23.0.106:43358] failed with org.apache.flink.shaded.akka.org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 40485760: 1195725860 - discarded\n2019-03-05 06:59:48,146 ERROR org.apache.flink.runtime.blob.BlobServerConnection            - Error while executing BLOB connection.\n\njava.io.IOException: Unknown operation 71\n\n\tat org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:120)\n","stream":"stdout","docker":{"container_id":"7306a89cc3da3f55ae6c37c18228e658388f70168796311110459ec83b8fb4b6"},"kubernetes":{"container_name":"scenario-service-flink-jobmanager","namespace_name":"nightly","pod_name":"scenario-service-flink-jobmanager-547fcc8f48-6j7sq","pod_id":"e096ba68-3bab-11e9-b301-92e9bbf34b8a","labels":{"app":"scenario-service-flink-jobmanager","pod-template-hash":"1039774904","release":"nightly-scenario-service-flink-jobmanager"},"host":"aks-nightly-49443436-2","master_url":"https://nightly-4f2ceb09.hcp.westeurope.azmk8s.io:443/api","namespace_id":"ccf92e65-3bab-11e9-b301-92e9bbf34b8a"},"cluster":"KubernetesNightlyFRP"}

with this all three logs are clubbed together. But I want first log as one event and second and third combined as the next event.