vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
18.14k stars 1.6k forks source link

Support dead letter queue on sinks #1772

Open Jeffail opened 4 years ago

Jeffail commented 4 years ago

In the context of an event processor a dead letter queue can mean a number of things. We can already support content based DLQs using transforms to route certain events to a secondary sink when they aren't suitable for our primary sink.

However, it would also be nice to support dead letter queuing by chaining sinks as fallback options, where failed sends can be routed under certain circumstances (rather than retrying in an infinite loop, or being dropped completely).

It'd be cool to be able to do something like this:

[sinks.foo]
  inputs = [ "somewhere" ]
  type = "something"
  [sinks.foo.dlq]
    after_retries = 3

[sinks.bar]
  inputs = [ "foo.dlq" ]
  type = "something_else"

And since we're just producing output from sinks here there's no reason we can't add transforms in there as well:

[sinks.foo]
  inputs = [ "somewhere" ]
  type = "something"
  [sinks.foo.dlq]
    after_retries = 3

[transforms.baz]
  inputs = [ "foo.dlq" ]
  type = "clean_up"

[sinks.bar]
  inputs = [ "baz" ]
  type = "something_else"
binarylogic commented 4 years ago

I support this. I'm curious if this could be architected in a way that wouldn't require a ton of manual individual sink work?

jszwedko commented 4 years ago

Noting here that it'd be useful to "replay" messages from the DLQs easily.

jszwedko commented 3 years ago

Use case from Discord: https://discord.com/channels/742820443487993987/746070591097798688/875360138612064286

Hi guys, 
I'm having some errors on a vector instance that is responsible to read messages from kafka and sink them to elastic.
There is 2 kind of messages 
ERROR sink{component_kind="sink" component_name=elasticsearch_sink_out component_type=elasticsearch}:request{request_id=2929383}: vector::sinks::util::retries: Not retriable; dropping the request. reason="error type: mapper_parsing_exception, reason: object mapping for [deactivated_id] tried to parse field [null] as object, but found a concrete value"

ERROR sink{component_kind="sink" component_name=elasticsearch_sink_out component_type=elasticsearch}:request{request_id=2929418}: vector::sinks::util::retries: Not retriable; dropping the request. reason="error type: mapper_parsing_exception, reason: failed to parse field [price_effective_date_active] of type [date]"

However, as I use one sink for multiple topics, I'm unable to know which topic is error related. As those fields are present is multiple topics also.
Is there any possibilities to customize logs to have more informations ? 
I tried debug logs, but didn't get much in it.
If I could have the source topic and an offset or at least the target index (one topic goes to one index) that would be really helpful
shamil commented 2 years ago

Any news regarding this? We desperately need this DLQ feature. Logstash DLQ helps us a lot and this is the only feature that keeps us from migrating

jszwedko commented 2 years ago

Hi @shamil ! It's still something we very much want to do, but hasn't been scheduled yet.

shamil commented 2 years ago

Thanks @jszwedko I will wait ;)

ottramst commented 2 years ago

Would like this feature as well :) We're working in an environment where we capture multiple different log formats and try to send them to Elasticsearch. Needless to say that this feature would help us out a lot.

We previously were using the Dead Letter Queue on Logstash for this where we read the failed documents straight out of the Dead Letter Queue with a dedicated plugin, did some transforms and indexed them back (in a different format) to Elasticsearch for further investigation.

rlazimi-dev commented 2 years ago

Just bumping - the need for this feature came up for me again.

rlazimi-dev commented 2 years ago

I think it's important that the DLQ contains actionable messages where an actionable message looks like: {error msg, payload that caused the error}. Through such actionable messages, we can figure out what failed and also calculate exactly why it failed.

If it is too much work to architect a DLQ that chains off of sinks, then I think for the sake of delivering such functionality, it is worth extending a feature that you guys already have architected.

The feature im referring to is the internal_logs source.

Proposed (simple) solution: Considering the internal_logs source already produces events per error, which already record the error and the sink that caused the error, then all that's needed to improve the existing error handling is to include another field that is the original payload that caused the error.

Example Just to prove the decent power of the current internal_logs implementation, below is an example of the fields it captures. A comment denotes the change I think will be helpful.

{
    "response": "Response { status: 400, version: HTTP/1.1, headers: {\"server\": \"openresty/1.15.8.1\ ... cut for brevity}",
    // "payload": {...}, //<------ payload should contain the original message that was in the component that the error came from
    "metadata": {
      "level": "ERROR",
      "kind": "event",
      "target": "vector::sinks::util::sink",
      "module_path": "vector::sinks::util::sink"
    },
    "pid": 1,
    "timestamp": "2022-06-13T15:37:11.637097037Z",
    "source_type": "internal_logs",
    "host": "vector-deployment-edge-container-id",
    "message": "Response failed."
  }

Note: I'm aware that this implementation does not really produce a DLQ, and that it does not technically satisfy the OP's description, but I believe many people would be satisfied with this idea for now, because it at least drastically helps in diagnosing sink errors in components. Thumbs up if you think this is "good enough" for now or thumbs down in case I'm misguided. I just want something to be done as opposed to waiting too long for the perfect solution.

@jszwedko @binarylogic what do you guys think? I havent read the code, but i would guess that it's straightforward to implement considering it seems you guys already have the codified infrastructure set up for internal logs to pick up sink errors.

Thank you for the read!

jszwedko commented 2 years ago

Thanks for the thoughts @rlazimi-dev ! I think this is something we might pick up in Q3.

awangc commented 2 years ago

Any updates on this? It's something that could be really useful.

syntastical commented 2 years ago

I just effectively ran into this issue too. I'm using the HTTP sink and I'd really like to have access to the output produced from the HTTP request for use in other transforms and sinks.

kushalhalder commented 1 year ago

Folks, any updates on this? We cannot use this for any stable pipeline if DLQ is not supported. Or is there an alternative to DLQ strategies?

kushalhalder commented 1 year ago

For example: https://github.com/vectordotdev/vector-test-harness/blob/master/cases/tcp_to_http_performance/ansible/config_files/vector.toml Even in this case, we are testing for speed, but what about retries and stability and fault tolerance?

jszwedko commented 1 year ago

No updates yet unfortunately. Sinks do retry requests for transient failures.

tastyfrankfurt commented 1 year ago

Have to say pretty big gap to be missing, as some of those logs could be critical and need recovering, or you also could be put in a position where you end up duplicating logs through the stack just to recover a couple. I think this is a must have feature for sinks, especially given something similar has been implemented in VRL.

Maybe a simpler way for doing the elastic sink is to just add additional configuration item for specifying the dlq index and vector can output to that index if the current one fails all retries.

ravenjm commented 1 year ago

I think it's not about retries especially when we talking about elasticsearch. The most common issue is mapping mismatch for example:

{
     "field1": "this is a string"
}

{
     "field1": 12345
}
knightprog commented 1 year ago

I have the same exact case as @ravenjm, using elasticsearch sink, rejected logs because of format mismatch are just dropped.
It would be really interesting to be able to route them in another sink.

roniez commented 7 months ago

I have come in to the same issue and it would be really nice to send failed messages to another sink to maybe send them back to a kafka topic for dedicated DLQ messages or to file on disk.

marcus-crane commented 6 months ago

One particular use case I have is around using Vector for delivering logs. We persist some logs in Kafka which then go through a processing pipeline but occasionally, someone will send through a log line that is bigger than the maximum Kafka payload size (that we probably have as a default).

In these cases, it would be nice to have a DLQ action. Probably not a literal queue but say; rerouting an event like that to an S3 bucket. Ironically, a standard pattern of publishing to a dead letter Kafka topic wouldn't work because it too would not be sized large enough plus it's rare enough that ordering is not that useful of a property.

DimDroll commented 6 months ago

someone will send through a log line that is bigger than the maximum Kafka payload size (that we probably have as a default).

@marcus-crane for such cases we developed a number of remaps that handle "dead letter" cases.

First one that checks event's size and then "abort"s processing:

[transforms.defaultChecks]
  type            = "remap"
  inputs          = ["parseLogs"]
  drop_on_abort   = true
  drop_on_error   = true
  reroute_dropped = true

  source = '''
# Log events to console:
  if ${TOML_DEBUG} { log("PRE-defaultChecks: " + encode_json(.), level: "info", rate_limit_secs: 60) }

####################################################################
####################### WHEN EVENT IS > 3MB ########################
####################################################################

# initiate possibly empty error object
  if is_object(.error) {
    .error = object!(.error)
  } else {
    .error = {}
  }

# if root objects byte size is bigger then Kafkas max.message.bytes (3MB),
# we should invalidate it for later inspection
  if length(encode_json(.)) > 3000000 {
    .error = merge(.error, {"type": "pipeline", "message": "MessageSizeTooLarge"})
    abort
  }

  . = compact(.)

# Log events to console:
  if ${TOML_DEBUG} { log("POST-defaultChecks: " + encode_json(.), level: "info", rate_limit_secs: 60) }
'''

Next, ALL aborted messages in previous remap's are going through catch-all remap that tags them properly. "route" is internal field name we use for dynamic events routing:

[transforms.catchAbortAndError]
  type   = "remap"
  inputs = ["*.dropped"]

  source = '''
# Log events to console:
  if ${TOML_DEBUG} { log("PRE-catchAbortAndError: " + encode_json(.), level: "info", rate_limit_secs: 60) }
  if ${TOML_DEBUG} { log("METADATA: " + encode_json(%), level: "info", rate_limit_secs: 60) }

####################################################################
###################### WHEN EVENT HAS ERRORS #######################
####################################################################

# metadata contains abort! error information
  .event.metadata    = encode_json(%)

  .route.type      = "all"
  .route.dataset   = "catch"
  .route.namespace = "all"

# Log events to console:
  if ${TOML_DEBUG} { log("POST-catchAbortAndError: " + encode_json(.), level: "info", rate_limit_secs: 60) }
'''

And last, all our kafka sinks are configured to support large messages with:

[sinks.tranformedKafka]
  type              = "kafka"
  inputs            = ["catchAbortAndError", "defaultChecks"]
  bootstrap_servers = "${KAFKA_BROKERS}"
  topic             = "{{`{{ .route.type }}`}}-{{`{{ .route.dataset }}`}}-{{`{{ .route.namespace }}`}}"
  compression       = "none"
  healthcheck       = true

  [sinks.tranformedKafka.encoding]
    codec = "json"

  [sinks.tranformedKafka.tls]
    enabled  = true
    ca_file  = "/etc/kafka-cluster-ca/ca.crt"
    crt_file = "/etc/${VECTOR_USER}-user-certs/user.crt"
    key_file = "/etc/${VECTOR_USER}-user-certs/user.key"

  # handleDropped produced events more then 100MB
  [sinks.tranformedKafka.librdkafka_options]
    "message.max.bytes" = "100000000" # 100MB

this of course can't handle cases when ElasticSearch rejects a message due to mapping conflicts, which is shame, but theoretically can be worked around by having strict schema verification in vector, but this is still in our to do list. hopefully this RFC will be finished and merged before that :) https://github.com/vectordotdev/vector/pull/14708