astronomer / airflow-provider-kafka

A provider package for kafka
Apache License 2.0
37 stars 16 forks source link

Add template fields to ProduceToTopicOperator #6

Closed sergeyh closed 2 years ago

sergeyh commented 2 years ago

This change enables Jinja templating for ProduceToTopicOperator. It's useful when the producer function needs access to execution context, e.g. date interval, or when it depends on the upstream result, e.g. XCom variable. Otherwise, you have to write a custom PythonOperator and use the Hook/Producer directly.

Example 1 - dynamically route messages to a topic based on upstream status:

ProduceToTopicOperator(topic="export_result_{{ ti.xcom_pull(task_ids='export_status') }}")

Example 2 - access the context variables, e.g. DagRun or TaskInstance, from producer_function:

def produce_messages(**context):
    return load_from_db(start_date=context['start_date'], end_date=context['end_date'])

ProduceToTopicOperator(producer_function_kwargs={
    "start_date": "{{ data_interval_start }}",
    "end_date": "{{ data_interval_end }}"
})

Tested this manually. Unfortunately, I don't know how to write a unit test for this.