fluent / fluent-plugin-kafka

Kafka input and output plugin for Fluentd
Other
303 stars 178 forks source link

kafka message getting rejected at OpenSearch when JSON message contains field names "message" in fields #446

Closed vishalmamidi closed 2 years ago

vishalmamidi commented 2 years ago

Describe the bug

Kafka messages are getting rejected at OpenSearch when there is a field named message in the JSON Kafka message.

To Reproduce

not facing issue with below JSON

{
    "event": "created",
    "data": {
        "account_number": "222222223",
        "amount": 9900,
        "check_number": "11",
        "created_at": "2019-12-12T22:34:59Z",
        "currency": "USD",
        "deposit_date": "2019-12-12",
        "id": "9e5c22be-2145-4da4-963f-b0434765d18f",
        "lockbox_number": "12345",
        "memo_field": "Christmas Tip",
        "object": "paper_item",
        "remitter_name": "Cristina Angela",
        "routing_number": "021000021",
        "status": "pending",
        "transaction_id": null,
        "transaction_line_item_id": null,
        "updated_at": "2019-12-12T22:34:59Z"
    }
}

but when I change JSON field data with message below example JSON is getting rejected

{
    "event": "created",
    "message": {
        "account_number": "222222223",
        "amount": 9900,
        "check_number": "11",
        "created_at": "2019-12-12T22:34:59Z",
        "currency": "USD",
        "deposit_date": "2019-12-12",
        "id": "9e5c22be-2145-4da4-963f-b0434765d18f",
        "lockbox_number": "12345",
        "memo_field": "Christmas Tip",
        "object": "paper_item",
        "remitter_name": "Cristina Angela",
        "routing_number": "021000021",
        "status": "pending",
        "transaction_id": null,
        "transaction_line_item_id": null,
        "updated_at": "2019-12-12T22:34:59Z"
    }
}

Expected behavior

It is supposed to get passed like below for both JSON with message field and without message field

image

Your Environment

- Fluentd version: 1.14.3
- fluent-plugin-kafka version: 0.17.3
- ruby-kafka version: 
- Operating system: CentOS
- Kernel version:

Your Configuration

    <source>
      @type kafka_group
      brokers b-1.amazonaws.com:9092,b-2.amazonaws.com:9092
      consumer_group amazon.broker-2
      topics jb-audit-user
      format json
    </source>

      <match jb-audit-user>
        @type opensearch

        ssl_verify false
        @log_level debug

        logstash_format true
        logstash_prefix jb-audit-user # defaults to "logstash"
        logstash_prefix_separator -     # defaults to "-"
        logstash_dateformat %Y.%m       # defaults to "%Y.%m.%d"

        user "xx"
        password "xx"

        <endpoint>
        url "https://vpc.es.amazonaws.com:443"
          region "us-east-1"
        </endpoint>

        <buffer>
          @type file
          path /var/log/fluentd-buffers/kubernetes-jb-audit-user.system.buffer
          flush_mode interval
          flush_interval 1s
          flush_thread_count 4
          chunk_full_threshold 0.9
        </buffer>
      </match>

Your Error Log

2022-02-14 12:49:13 +0000 [warn]: #0 dump an error event: error_class=Fluent::Plugin::OpenSearchErrorHandler::OpenSearchError error="400 - Rejected by OpenSearch [error type]: mapper_parsing_exception [reason]: 'failed to parse field [message] of type [text] in document with id 'Xe9H-H4BIPPY1LUJqWRI'. Preview of field's value: '{website=https://SrinivasStudios.com, notes=, applicationActive=false, parentEntity=Srinivas Studios, createdDateTime=1643961318752, emailAddress=support@srinivasstudios.com, clientType=Consumer, statusComments=Approved by JB 14-02-2022 06:19 PM IST, createdBy=null, contact=[], name=Srinivas Studios, clientSegment=Proprietary Trading Organizations, faxNumber=8979879879, modifiedBy=null, modifiedDateTime=1644842948940, id=143001, statusUpdateTime=1644842948000, status=Approved}''" location=nil tag="jb-audit-user" time=2022-02-14 12:49:10.332082686 +0000 record={"X-corelation-ID"=>nil, "requestId"=>"67726045-a99e-42ab-a485-faf8544ecf5a", "eventIndex"=>"jb-audit-user", "message"=>{"createdBy"=>nil, "createdDateTime"=>1643961318752, "modifiedBy"=>nil, "modifiedDateTime"=>1644842948940, "id"=>143001, "name"=>"Srinivas Studios", "emailAddress"=>"support@srinivasstudios.com", "clientType"=>"Consumer", "clientSegment"=>"Proprietary Trading Organizations", "faxNumber"=>"8979879879", "website"=>"https://SrinivasStudios.com", "parentEntity"=>"Srinivas Studios", "notes"=>"", "status"=>"Approved", "statusUpdateTime"=>1644842948000, "statusComments"=>"Approved by JB 14-02-2022 06:19 PM IST", "contact"=>[], "applicationActive"=>false}, "event"=>"POSTUPDATE", "eventTimestamp"=>1644842948941}


### Additional context

_No response_
raytung commented 2 years ago

Hey @vishalmamidi This does not sound like an issue related to the Kafka plugin, but rather specific to your OpenSearch setup. Perhaps you could look into your index mapping and confirm whether you have set the message field as a text as that's what the error log is indicating.

github-actions[bot] commented 2 years ago

This issue has been automatically marked as stale because it has been open 90 days with no activity. Remove stale label or comment or this issue will be closed in 30 days

github-actions[bot] commented 2 years ago

This issue was automatically closed because of stale in 30 days