opensearch-project / data-prepper

OpenSearch Data Prepper is a component of the OpenSearch project that accepts, filters, transforms, enriches, and routes data at scale.
https://opensearch.org/docs/latest/clients/data-prepper/index/
Apache License 2.0
262 stars 203 forks source link

Add WAF log convertor before sending to OpenSearch #2305

Open YikaiHu opened 1 year ago

YikaiHu commented 1 year ago

Is your feature request related to a problem? Please describe. It would be nice to have a convertor for WAF logs before sending to OpenSearch. If we want to build some Dashboard.

Describe the solution you'd like

Additional context Here are some parser code in Solution Centralized Logging with OpenSearch

class WAF(LogType):
    """An implementation of LogType for WAF Logs"""

    _format = "json"

    def parse(self, line: str):
        try:
            json_record = json.loads(line)

            # Extract web acl name, host and user agent
            json_record["webaclName"] = re.search(
                "[^/]/webacl/([^/]*)", json_record["webaclId"]
            ).group(1)
            headers = json_record["httpRequest"]["headers"]
            for header in headers:
                if header["name"].lower() == "host":
                    json_record["host"] = header["value"]
                elif header["name"].lower() == "user-agent":
                    json_record["userAgent"] = header["value"]
                else:
                    continue
            return json_record
        except Exception as e:
            logger.error(e)
            return {}
dlvenable commented 1 year ago

Thank you @YikaiHu for raising this.

Are you reading your log files from S3 or from another location? Are WAF logs coming as multiline JSON objects or as a JSON array?

If you are reading from S3 and it is a JSON array you can use the S3 source with the json codec.

source:
  s3:
    notification_type: "sqs"
    codec:
      json:
    compression: gzip

The grok processor may be able to support your regex. Can you provide a sample input for the webacl and an example output?

I'm not sure that our copy_values processor can copy from the headers in the way you need. We might be able to provide a generic solution for this however.

dlvenable commented 1 year ago

@YikaiHu ,

I believe the problem of mutating the headers is somewhat generic. I proposed #2410 as a possible processor which can solve this issue. Feel free to read the issue and make comments.

In your case, I believe with this hypothetical processor you could have a pipeline somewhat like the following:

processor:
  - list_to_map:
      source: httpRequest/headers
      target: headers
      key: name
      value_key: value
      flatten: true
  - rename_keys:
      entries:
        - from_key: headers/host
          to_key: host
        - from_key: headers/user-agent
          to_key: userAgent

Please let me know if this looks like something that could work for you.

At the same time, we am looking into creating a small language for mutating events. Something that can allow for greater flexibility. But, this would likely take more time to provide than the option above.