vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.42k stars 1.51k forks source link

Ingesting AWS CloudWatch Logs via AWS Kinesis Firehose #18830

Open rliskunov opened 11 months ago

rliskunov commented 11 months ago

A note for the community

Problem

I was following the instructions Ingesting AWS CloudWatch Logs via AWS Kinesis Firehose https://vector.dev/guides/advanced/cloudwatch-logs-firehose/

Vector is deployed in a cluster and https access is provided using Ingress (nginx). Additionally, shortened the transforms.parse part to get full information from CloudWatch in the console. However, I can't get the logs in Vector when submitting requests.

I have attached information in the logs when I make a request directly to a vector and expect to get MethodNotAllowed.

Configuration

api:
  address: 127.0.0.1:8686
  enabled: true
  playground: false
data_dir: /vector-data-dir
sinks:
  console:
    encoding:
      codec: json
    inputs:
    - firehose
    type: console
sources:
  firehose:
    access_keys:
    - test123
    address: 0.0.0.0:8080
    store_access_key: false
    type: aws_kinesis_firehose

Version

0.33.0-distroless-libc

Debug Output

DEBUG vector::app: Internal log rate limit configured. internal_log_rate_secs=10
INFO vector::app: Log level is enabled. level="vector=trace,codec=trace,vrl=trace,file_source=trace,tower_limit=trace,rdkafka=trace,buffers=trace,lapin=trace,kube=trace"
DEBUG vector::app: messaged="Building runtime." worker_threads=4
INFO vector::app: Loading configs. paths=["/etc/vector"]
DEBUG vector::config::loading: No secret placeholder found, skipping secret resolution.
DEBUG vector::topology::builder: Building new source. component=firehose
DEBUG vector::topology::builder: Building new sink. component=console
INFO vector::topology::running: Running healthchecks.
DEBUG vector::topology::running: Connecting changed/added component(s).
DEBUG vector::topology::running: Configuring outputs for source. component=firehose
DEBUG vector::topology::running: Configuring output for component. component=firehose output_id=None
DEBUG vector::topology::running: Connecting inputs for sink. component=console
DEBUG vector::topology::running: Adding component input to fanout. component=console fanout_id=firehose
DEBUG vector::topology::running: Spawning new source. key=firehose
INFO vector::topology::builder: Healthcheck passed.
TRACE vector::topology::running: Spawning new sink. key=console
DEBUG source{component_kind="source" component_id=firehose component_type=aws_kinesis_firehose component_name=firehose}: vector::topology::builder: Source starting.
DEBUG source{component_kind="source" component_id=firehose component_type=aws_kinesis_firehose component_name=firehose}: vector::topology::builder: Source pump supervisor starting.
DEBUG source{component_kind="source" component_id=firehose component_type=aws_kinesis_firehose component_name=firehose}: vector::topology::builder: Source pump starting.
DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::topology::builder: Sink starting.
INFO vector: Vector has started. debug="false" version="0.33.0" arch="x86_64" revision="89605fb 2023-09-27 14:18:24.180809939"
DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.18685914943425674
INFO vector::internal_events::api: API server running. address=127.0.0.1:8686 playground=off
DEBUG sink{component_kind="sink" component_id=console component_type=console component_name=console}: vector::utilization: utilization=0.018685958499412784
ERROR source{component_kind="source" component_id=firehose component_type=aws_kinesis_firehose component_name=firehose}: vector::internal_events::aws_kinesis_firehose: Error occurred while handling request. error="Rejection(MethodNotAllowed)" stage="receiving" error_type="request_failed" error_code=http_response_500 request_id= internal_log_rate_limit=true
ERROR source{component_kind="source" component_id=firehose component_type=aws_kinesis_firehose component_name=firehose}: vector::internal_events::aws_kinesis_firehose: Internal log [Error occurred while handling request.] is being suppressed to avoid flooding.

Example Data

{
  "messageType": "DATA_MESSAGE",
  "owner": "111111111111",
  "logGroup": "test",
  "logStream": "test",
  "subscriptionFilters": [
    "Destination"
  ],
  "logEvents": [
    {
      "id": "35683658089614582423604394983260738922885519999578275840",
      "timestamp": 1600110569039,
      "message": {
        "bytes": 26780,
        "datetime": "14/Sep/2020:11:45:41 -0400",
        "host": "157.130.216.193",
        "method": "PUT",
        "protocol": "HTTP/1.0",
        "referer": "https://www.principalcross-platform.io/markets/ubiquitous",
        "request": "/expedite/convergence",
        "source_type": "stdin",
        "status": 301,
        "user-identifier": "-"
      }
    }
  ]
}

Additional Context

A subscription has been created for the logs

aws logs put-subscription-filter \
  --log-group-name ${LOG_GROUP} \
  --filter-name "Destination" \
  --filter-pattern "" \
  --destination-arn "arn:aws:firehose:${AWS_REGION}:${AWS_ACCOUNT_ID}:deliverystream/${FIREHOSE_DELIVERY_STREAM}" \
  --role-arn "arn:aws:iam::${AWS_ACCOUNT_ID}:role/CWLtoKinesisFirehoseRole

References

No response

neuronull commented 10 months ago

Hello,

Inspecting the error message a bit

ERROR source{component_kind="source" component_id=firehose component_type=aws_kinesis_firehose component_name=firehose}: vector::internal_events::aws_kinesis_firehose: Error occurred while handling request. error="Rejection(MethodNotAllowed)" stage="receiving" error_type="request_failed" error_code=http_response_500 request_id= internal_log_rate_limit=true

, within the source code I see we (Vector) are the ones setting that HTTP 500 response code, which we are doing when we can't find a specific error code that matches what we received.

The more useful piece is what you called out:

Rejection(MethodNotAllowed)

, that is being returned from AWS. In this spec I saw the following description

"The specified method is not allowed against this resource. Modify the bucket’s policy to allow the correct Amazon S3 operation permissions."

So my first recommendation/hunch would be that there is something going on with the configuration on the AWS side.