vectordotdev / vector

A high-performance observability data pipeline.
https://vector.dev
Mozilla Public License 2.0
17.51k stars 1.53k forks source link

Add support for kafka transactional producer #13539

Open fpytloun opened 2 years ago

fpytloun commented 2 years ago

A note for the community

Use Cases

Attempted Solutions

librdkafka_options."transactional.id" = "${HOSTNAME}"
librdkafka_options."transaction.timeout.ms" = "300000"

This results in following error as it requires changes in producer code:

ERROR sink{component_kind="sink" component_id=out_fluent_kafka component_type=kafka component_name=out_fluent_kafka}:request{request_id=272}: vector_core::stream::driver: Service call failed. error=KafkaError (Message production error: State (Local: Erroneous state)) request_id=272

Second option (transaction.timeout.ms) must be less or equal to message_timeout_ms (which defaults to 300000).

Until this is implemented, simple idempotent producer can be used with:

librdkafka_options."enable.idempotence" = "true"

Proposal

No response

References

Version

No response

sumeet-zuora commented 2 years ago

@fpytloun where u able to resolve the issue with 2022-07-19T16:47:09.242424Z ERROR sink{component_kind="sink" component_id=kafka component_type=kafka component_name=kafka}:request{request_id=1129}: vector_core::stream::driver: Service call failed. error=KafkaError (Message production error: State (Local: Erroneous state)) request_id=1129

fpytloun commented 2 years ago

@fpytloun where u able to resolve the issue with 2022-07-19T16:47:09.242424Z ERROR sink{component_kind="sink" component_id=kafka component_type=kafka component_name=kafka}:request{request_id=1129}: vector_core::stream::driver: Service call failed. error=KafkaError (Message production error: State (Local: Erroneous state)) request_id=1129

Unfortunately no, it requires change in code to explicitly use transactions and I didn't try that.

eitanzuk commented 1 year ago

hi, is there any update on this ?