elastic / elastic-serverless-forwarder

Elastic Serverless Forwarder
Other
35 stars 36 forks source link

Forward structured application JSON logs, optionally enriched with ECS fields #258

Closed pushred closed 1 year ago

pushred commented 1 year ago

Describe the enhancement:

I am using the Kinesis input to collect structured logs from AWS CloudWatch that have been partly populated using the ecs-pino-format library. Inspecting the data output to Elasticsearch I found that there are a couple layers of structure around my application logs:

{
  "@timestamp": "2023-02-14T21:26:04.387857Z",
  "message": "{\"id\":\"37385191276969030971785538657917125344955573022299717635\",\"timestamp\":1676409955997,\"message\":\"{\\\"log.level\\\":\\\"info\\\",\\\"@timestamp\\\":\\\"2023-02-14T21:25:55.996Z\\\",\\\"message\\\":\\\"beepboop\\\"}\"}",
}

I was expecting to see my application logs as-is. This could be achieved if:

Describe a specific use case for the enhancement or feature:

When I initially saw this I thought that it was a reasonable preservation of the various layers my logs are passing through. I assumed that I could use Ingest Pipelines to parse each layer, extract my logs, and merge them into the ESF structure in order to leverage some of it's ECS content while adding my own. For reference, here's a portion of my pipeline:

{
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "parsed_cloudwatch_log_event"
      }
    },
    {
      "json": {
        "field": "parsed_cloudwatch_log_event.message",
        "add_to_root": true
      }
    },
    {
      "set": {
        "field": "@timestamp",
        "copy_from": "parsed_app_log.@timestamp",
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "message",
        "copy_from": "parsed_app_log.message",
        "ignore_failure": true
      }
    },
    {
      "remove": {
        "field": "parsed_cloudwatch_log_event"
      }
    },
    {
      "remove": {
        "field": "parsed_app_log"
      }
    }
  ]
}

However I have encountered some limitations in the pipeline processors that are currently blocking me from accessing a log.level field that ecs-pino-format is adding.

In my own case I may need to stop using ecs-pino-format as I don't see the blocking issues in the data processors being resolved anytime soon. I don't really expect that this enhancement will be implemented either. But I wanted to at least document what I'm dealing with as a user.

aspacca commented 1 year ago

hello @pushred, sorry for the late reply

just to be sure I got right your scenario: you send the logs collected with the ecs-pino-format to cloudwatch logs and then have a subscription of the log group to a kinesis data stream that you use as input of the forwarder, do you?

would you be able to use the cloudwatch logs log group (or log stream) as direct input of the forwarder? in this case the original json logs won't be wrapped by the subscription format to the kinesis data stream and you won't need any unfolding

I have the impression you have already added the expand_event_list_from_field: logEvents in your config, can you confirm?

I was expecting to see my application logs as-is. This could be achieved if:

  • The CloudWatch log record structure was parsed by ESF
  • The log record message string was attempted to be parsed as JSON

    • If message is valid JSON, merge the content into the root object that is forwarded
    • If message is not JSON, preserve it as-is
  • Optionally add (or overwrite/merge) ECS objects with data from ESF

I understand your expectation but that's not the current behaviour of the ESF

in your scenario each kinesis record has the following payload

{
    "owner": "111111111111",
    "logGroup": "CloudTrail/logs",
    "logStream": "111111111111_CloudTrail/logs_us-east-1",
    "subscriptionFilters": [
        "RecipientStream"
    ],
    "messageType": "DATA_MESSAGE",
    "logEvents": [
        {
            "id": "3195310660696698337880902507980421114328961542429EXAMPLE",
            "timestamp": 1432826855000,
            "message": "{\"log.level\":\"info\",\"@timestamp\":\"2023-02-14T21:25:55.996Z\",\"message\":\"beepboop\"}"
        },
        ...
    ]
}

this payload is already parsed as JSON, and on top of that you expand the entries in {"logEvents": [{forwarded event #1},{forwarded event #2}, etc]}

the problem is that {forwarded event #N} is an other nested level of json, that you would like to extract as event to be forwarder the content of {”message": ... }

if you had directly the cloudwatch log group/stream as input of the forwarder the scenario would be different: in this case each log event message of the log group would have the following payload:

{"log.level":"info","@timestamp":"2023-02-14T21:25:55.996Z","message":"beepboop"}

and the forwarder would sent to ES the following event:

{
  "@timestamp": "2023-02-14T21:26:04.387857Z",
  "message": "{\"log.level\":\"info\",\"@timestamp\":\"2023-02-14T21:25:55.996Z\",\"message\":\"beepboop\"}",
}

I would expect you will be then able to run the following processor (or something similar):

{
  "processors": [
    {
      "json": {
        "field": "message",
        "add_to_root": true,
        "add_to_root_conflict_strategy": "replace"
      }
    }
  ]
}

it's unclear to me where does the parsed_app_log field in your ingestion pipeline come from

even assuming you stick to the format of the subscription filter to the kinesis data stream you should be able to run something like the following ingest pipeline:

{
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "parsed_cloudwatch_log_event"
      }
    },
    {
      "json": {
        "field": "parsed_cloudwatch_log_event.message",
        "add_to_root": true,
        "add_to_root_conflict_strategy": "replace"
      }
    },
    {
      "remove": {
        "field": "parsed_cloudwatch_log_event"
      }
    }
  ]
}

do I miss anything?

there's no plan at the moment to add a feature to merge the content of a json field in the forwarder event to the root of the event directly in the forwarder: it's intended for such kind of processing to be taken care of in an ingestion pipeline, once you are given the ability to isolate single event to be forwarded (like using expand_event_list_from_field in this case)

pushred commented 1 year ago

just to be sure I got right your scenario: you send the logs collected with the ecs-pino-format to cloudwatch logs and then have a subscription of the log group to a kinesis data stream that you use as input of the forwarder

Exactly

would you be able to use the cloudwatch logs log group (or log stream) as direct input of the forwarder? in this case the original json logs won't be wrapped by the subscription format to the kinesis data stream and you won't need any unfolding

We pursued this option originally but the issues noted in https://github.com/elastic/elastic-serverless-forwarder/issues/219 lead us to Kinesis.

I have the impression you have already added the expand_event_list_from_field: logEvents in your config, can you confirm?

Correct we are using this and it does remove the Kinesis structure.

even assuming you stick to the format of the subscription filter to the kinesis data stream you should be able to run something like the following ingest pipeline

I tried a pipeline this actually and ran into this error:

{
  "root_cause": [
    {
      "type": "illegal_argument_exception",
      "reason": "cannot add non-map fields to root of document"
    }
  ],
  "type": "illegal_argument_exception",
  "reason": "cannot add non-map fields to root of document"
}

We weren't able to make sense of it however and have ended up with a very verbose pipeline using Set to copy values from parsed_app_log into the root individually. Perhaps this is a bug I should file an issue for somewhere, not sure where though.

there's no plan at the moment to add a feature to merge the content of a json field in the forwarder event to the root of the event directly in the forwarder: it's intended for such kind of processing to be taken care of in an ingestion pipeline

Understood. In the end the ingest pipeline did meet our needs and I can see the benefit of not abstracting this away and preserving flexibility. Also understood the decision noted elsewhere to delegate data transformation to ingest pipelines.

This was just not a trivial "out of the box" experience, especially for me personally who hasn't worked with the Elastic stack much before. The number of Lambda functions in our project is definitely a complicating factor and maybe not so common. I thought I should file an issue to provide the feedback and for reference by anyone else in the future who might face similar obstacles.

aspacca commented 1 year ago

We weren't able to make sense of it however and have ended up with a very verbose pipeline using Set to copy values from parsed_app_log into the root individually. Perhaps this is a bug I should file an issue for somewhere, not sure where though.

I'd suggest you to open a thread in the https://discuss.elastic.co/c/elastic-stack/elasticsearch/6 about that. not sure if it's a bug or a missing feature, or if there is any workaround. in case the repo should be the elasticearch one

This was just not a trivial "out of the box" experience, especially for me personally who hasn't worked with the Elastic stack much before. The number of Lambda functions in our project is definitely a complicating factor and maybe not so common. I thought I should file an issue to provide the feedback and for reference by anyone else in the future who might face similar obstacles.

sure, you did right to file an issue: knowledge will stay here for future users facing your same problems. and we plan to collect this kind of feedback in a knowledge base related to the forwarder