We’re trying to use AwaitMessageSensor from Airflow provider to get messages from Kafka. AwaitMessageSensor has a parameter “apply_function”. This parameter is mandatory, and describes the function is used to process Kafka messages. The value of this parameter should be a dotted string and is the path to the function.
No matter which path we try, the sensor never found the function.
PYTHONPATH has the following paths:
/opt/python3.11/bin
/opt/python3.11/lib/python311.zip
/opt/python3.11/lib/python3.11
/opt/python3.11/lib/python3.11/lib-dynload
/opt/python3.11/lib/python3.11/site-packages
/home/airflow/gcs/dags
/etc/airflow/config
/home/airflow/gcs/plugins
Path of the file: /home/airflow/gcs/dags
Path the DAG is running in: /home/airflow
We tried different values for the apply_function
process_messages
event_based_triggering.process_messages
dags.event_based_triggering.process_messages
gcs.dags.event_based_triggering.process_messages
airflow.gcs.dags.event_based_triggering.process_messages
home.airflow.gcs.dags.event_based_triggering.process_messages
What you think should happen instead
Below is the error in log:
ERROR - Trigger failed:
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 529, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 602, in run_trigger
async for event in trigger.run():
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/apache/kafka/triggers/await_message.py", line 99, in run
processing_call = import_string(self.apply_function)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/utils/module_loading.py", line 39, in import_string
module = import_module(module_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
File "<frozen importlib._bootstrap>", line 1126, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
File "<frozen importlib._bootstrap>", line 1140, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'name_of_a_module'
We did some digging and tests -
If we use the module we created as an import at the top of the DAG file, the import statement works, and we are able to call the function in the DAG.
from util import my_process
But, in the AwaitMessageSensor, it uses importlib.import_module("util.my_package") which does not work.
We have found a workaround for our problem. In ConsumeFromTopicOperator, we can use both dotted string and callable as the value of "apply_function". The apply_function of ConsumeFromTopicOperator works fine. Why not do the same for AwaitMessageSensor? When you compare ConsumeFromTopicOperator with AwaitMessageTrigger, you will see that the change to be made is only:
if isinstance(self.apply_function, str):
process_message = import_string(self.apply_function)
else:
process_message self.apply_function
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
Apache Airflow Provider(s)
google
Versions of Apache Airflow Providers
2.9.3
Apache Airflow version
Apache Airflow Providers Google 10.12.0
Operating System
Ubuntu
Deployment
Google Cloud Composer
Deployment details
No response
What happened
We’re trying to use AwaitMessageSensor from Airflow provider to get messages from Kafka. AwaitMessageSensor has a parameter “apply_function”. This parameter is mandatory, and describes the function is used to process Kafka messages. The value of this parameter should be a dotted string and is the path to the function.
No matter which path we try, the sensor never found the function.
PYTHONPATH has the following paths: /opt/python3.11/bin /opt/python3.11/lib/python311.zip /opt/python3.11/lib/python3.11 /opt/python3.11/lib/python3.11/lib-dynload /opt/python3.11/lib/python3.11/site-packages /home/airflow/gcs/dags /etc/airflow/config /home/airflow/gcs/plugins
Path of the file: /home/airflow/gcs/dags Path the DAG is running in: /home/airflow
We tried different values for the apply_function process_messages event_based_triggering.process_messages dags.event_based_triggering.process_messages gcs.dags.event_based_triggering.process_messages airflow.gcs.dags.event_based_triggering.process_messages home.airflow.gcs.dags.event_based_triggering.process_messages
What you think should happen instead
Below is the error in log:
We did some digging and tests - If we use the module we created as an import at the top of the DAG file, the import statement works, and we are able to call the function in the DAG.
from util import my_process
But, in the AwaitMessageSensor, it uses importlib.import_module("util.my_package") which does not work.
We have found a workaround for our problem. In ConsumeFromTopicOperator, we can use both dotted string and callable as the value of "apply_function". The apply_function of ConsumeFromTopicOperator works fine. Why not do the same for AwaitMessageSensor? When you compare ConsumeFromTopicOperator with AwaitMessageTrigger, you will see that the change to be made is only:
Instead of:
And also need to change the type definition of apply_function in AwaitMessageSensor and AwaitMessageTrigger. Overall, the change is small.
How to reproduce
Use AwaitMessageSensor in Gooogle Cloud Composer and the problem will produce.
Anything else
The problem happens every time.
Are you willing to submit PR?
Code of Conduct