fluent / fluent-plugin-s3

Amazon S3 input and output plugin for Fluentd
https://docs.fluentd.org/output/s3
314 stars 218 forks source link

Fluentd reads each line of json file into json, instead of taking whole file as a single log stream #395

Closed YikaiHu closed 2 years ago

YikaiHu commented 2 years ago

Describe the bug

Hi all, I am new with Fluentd.

And I want to read log file stored in s3 in json format and then write into AWS Kinesis Data Stream.

While reading the file from s3, fluentd reads each line of json file into json instead of taking whole file as a single log stream.

My fluentd configuration is look like below:

####
## Output descriptions:
##
<match loghub_s3>
  @type kinesis_streams
  stream_name xxxxxxxxxxx
  region us-east-1
  <buffer>
    chunk_limit_size 1m
    flush_interval 10s
    flush_thread_count 2
  </buffer>
  debug true
</match>

####
## Source descriptions:
##

<source>
  @type s3
  tag loghub_s3

  s3_bucket xxxxxxxxxxxxxx
  s3_region us-east-1
  store_as json
  add_object_metadata true

  <instance_profile_credentials>
    ip_address 169.254.169.254
    port       80
  </instance_profile_credentials>

  <sqs>
    queue_name xxxxxxxxx
  </sqs>
  <parse>
    @type json
  </parse>
</source>

I would appreciate if anyone help me to figure out how to read entire json file as single log stream.

My Json file in S3 is like:

{
    "time": "08/Mar/2022:06:28:03 +0000",
    "host": "204.48.112.175",
    "user-identifier": "-",
    "method": "POST",
    "request": "/synthesize/initiatives/integrated",
    "protocol": "HTTP/2.0",
    "status": 502,
    "bytes": 10272,
    "referer": "http://www.centralenable.name/user-centric/reintermediate/synergistic/e-business"
}

And I also set the output of Fluentd to stdout to see the output, and I got the:

2022-04-02 08:25:20.943252725 +0000 loghub_s3: {"message":"{\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943262261 +0000 loghub_s3: {"message":"    \"time\": \"08/Mar/2022:06:28:03 +0000\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943264300 +0000 loghub_s3: {"message":"    \"host\": \"204.48.112.175\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943265636 +0000 loghub_s3: {"message":"    \"user-identifier\": \"-\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943267700 +0000 loghub_s3: {"message":"    \"method\": \"POST\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943268871 +0000 loghub_s3: {"message":"    \"request\": \"/synthesize/initiatives/integrated\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943269963 +0000 loghub_s3: {"message":"    \"protocol\": \"HTTP/2.0\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943270955 +0000 loghub_s3: {"message":"    \"status\": 502,\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943271936 +0000 loghub_s3: {"message":"    \"bytes\": 10272,\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943273218 +0000 loghub_s3: {"message":"    \"referer\": \"http://www.centralenable.name/user-centric/reintermediate/synergistic/e-business\"\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943274295 +0000 loghub_s3: {"message":"}","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}

Many thanks.

To Reproduce

Start the fluentd.

And upload the log file to the S3 bucket.

Then you can see the parsed log.

Expected behavior

Read entire json file as single log stream.

Your Environment

- Fluentd version: 1.14.3
- TD Agent version: 4.3.0
- fluent-plugin-s3 version: 1.6.1
- aws-sdk-s3 version: not sure
- aws-sdk-sqs version: not sure
- Operating system: Amazon Linux2
- Kernel version: 5.10.102-99.473.amzn2.x86_64

Your Configuration

<match loghub_s3>
  @type kinesis_streams
  stream_name xxxxxxxxxxx
  region us-east-1
  <buffer>
    chunk_limit_size 1m
    flush_interval 10s
    flush_thread_count 2
  </buffer>
  debug true
</match>

<source>
  @type s3
  tag loghub_s3

  s3_bucket xxxxxxxxxxxxxx
  s3_region us-east-1
  store_as json
  add_object_metadata true

  <instance_profile_credentials>
    ip_address 169.254.169.254
    port       80
  </instance_profile_credentials>

  <sqs>
    queue_name xxxxxxxxx
  </sqs>
  <parse>
    @type json
  </parse>
</source>

Your Error Log

There is no error log. But the output is weird.

My Json file in S3 is like:

{
    "time": "08/Mar/2022:06:28:03 +0000",
    "host": "204.48.112.175",
    "user-identifier": "-",
    "method": "POST",
    "request": "/synthesize/initiatives/integrated",
    "protocol": "HTTP/2.0",
    "status": 502,
    "bytes": 10272,
    "referer": "http://www.centralenable.name/user-centric/reintermediate/synergistic/e-business"
}

And I also set the output of Fluentd to stdout to see the output, and I got the:

2022-04-02 08:25:20.943252725 +0000 loghub_s3: {"message":"{\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943262261 +0000 loghub_s3: {"message":"    \"time\": \"08/Mar/2022:06:28:03 +0000\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943264300 +0000 loghub_s3: {"message":"    \"host\": \"204.48.112.175\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943265636 +0000 loghub_s3: {"message":"    \"user-identifier\": \"-\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943267700 +0000 loghub_s3: {"message":"    \"method\": \"POST\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943268871 +0000 loghub_s3: {"message":"    \"request\": \"/synthesize/initiatives/integrated\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943269963 +0000 loghub_s3: {"message":"    \"protocol\": \"HTTP/2.0\",\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943270955 +0000 loghub_s3: {"message":"    \"status\": 502,\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943271936 +0000 loghub_s3: {"message":"    \"bytes\": 10272,\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943273218 +0000 loghub_s3: {"message":"    \"referer\": \"http://www.centralenable.name/user-centric/reintermediate/synergistic/e-business\"\n","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}
2022-04-02 08:25:20.943274295 +0000 loghub_s3: {"message":"}","s3_bucket":"loghub-logs-691546483958","s3_key":"json/json-time7.json"}

Additional context

No response

YikaiHu commented 2 years ago

My fault, the log should be:

{"time": "08/Mar/2022:06:28:03 +0000", "host": "204.48.112.175", "user-identifier": "-", "method": "POST", "request": "/synthesize/initiatives/integrated", "protocol": "HTTP/2.0", "status": 502, "bytes": 10272, "referer": "http://www.centralenable.name/user-centric/reintermediate/synergistic/e-business"}