astronomer / airflow-provider-kafka

A provider package for kafka
Apache License 2.0
37 stars 16 forks source link

ProduceToTopicOperator execution_timeout #18

Closed AlexandrChikur closed 1 year ago

AlexandrChikur commented 1 year ago

Hi there!

I have some troubles with using the ProduceToTopicOperator in cases when Kafka is unavailable. The problem is the task is "hanging" for ... 30 or more minutes on producing. execution_timeout & poll_timeout does not anything

I'd like to limit task execution time, could anybody help me ?

Here is operator definition:

t = ProduceToTopicOperator( task_id=f"some_task_id", topic="ordrs", producer_function="module.producer_function", kafka_config={"bootstrap.servers": "broker:9092"}, producer_function_kwargs={"tasks": tasks}, trigger_rule=TriggerRule.ALL_SUCCESS, execution_timeout=timedelta(seconds=5) )

Producer function:

def producer_function(tasks): for task in tasks: print(f"Task to produce:\n{json.dumps(task, sort_keys=True, indent=4)}") yield (json.dumps(hash(str(task))), json.dumps(task))

Task duration: image

upd.: I did a research and now i know that it tooks 3 attempts with 5 mins on each and then mark task as failed. So i want to chage this behaviour to 4 sec on each attempt, and also mark it as failed task

dylanbstorey commented 1 year ago

I'm not sure what behaviour you're attempting to achieve exactly, could you try a more detailed example to help me out ?

AlexandrChikur commented 1 year ago

Thanks, @dylanbstorey for reply! Im already dont need a help, thank you a lot.

I found some configs for confluent-kafka Producer class on which ProduceToTopicOperator based. So I set kafka_config={"message.timeout.ms": "4000"} and wrote my own delivery callback function, that raise AirflowException if ack has an error.

dylanbstorey commented 1 year ago

I think thats a better solution honestly as it keeps the separation of concerns very clear.