apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.59k stars 14.17k forks source link

The Airflow Scheduler and Airflow Triggerer are failing to load the openlineage plugin with Custom extractors #38037

Open aditya-7 opened 7 months ago

aditya-7 commented 7 months ago

Apache Airflow Provider(s)

openlineage

Versions of Apache Airflow Providers

Screenshot 2024-03-11 at 6 52 24 PM

Apache Airflow version

2.8.2

Operating System

Debian GNU/Linux 12 (bookworm)

Deployment

Docker-Compose

Deployment details

Docker Compose version v2.24.3-desktop.

Created a custom docker image using Dockerfile:

FROM apache/airflow:2.8.2
# COPY manager.py /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py
COPY dags /opt/airflow/dags/
COPY plugins /opt/airflow/plugins/

Changed x-airflow-common.&airflow-common in the docker-compose.yml file:

  &airflow-common
  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.2}
  build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:- apache-airflow-providers-amazon}
    AIRFLOW__OPENLINEAGE__TRANSPORT: '{"type":"http","url":"http://192.168.1.40:9090"}'
    AIRFLOW__OPENLINEAGE__NAMESPACE: MyNamespace
    AIRFLOW__OPENLINEAGE__EXTRACTORS: plugins.extractors.some_lineage_extractor.MyExtractor

Built & deployed using the command: docker-compose build && docker-compose up

This is my project structure: Screenshot 2024-03-11 at 7 29 04 PM

What happened

While I deploy Airflow, the airflow-scheduler, and the airflow-triggerer containers fail to load the openlineage plugin. They can load inbuilt extractors such as BashExtractor, PythonExtractor, etc. Interestingly, the airflow-init container was able to load the plugin successfully. I was able to test this by overriding the library file /home/airflow/.local/lib/python3.8/site-packages/airflow/providers/openlineage/extractors/manager.py with a few debug points using the logger. I overwrote the ExtractorManager constructor to add some debug points like this:

class ExtractorManager(LoggingMixin):
    """Class abstracting management of custom extractors."""

    def __init__(self):
        super().__init__()
        self.extractors: dict[str, type[BaseExtractor]] = {}
        self.default_extractor = DefaultExtractor

        # Built-in Extractors like Bash and Python
        for extractor in _iter_extractor_types():
            print(f"inbuilt extractor: {extractor}")
            for operator_class in extractor.get_operator_classnames():
                self.extractors[operator_class] = extractor

        # Semicolon-separated extractors in Airflow configuration or OPENLINEAGE_EXTRACTORS variable.
        # Extractors should implement BaseExtractor
        env_extractors = conf.get("openlineage", "extractors", fallback=os.getenv("OPENLINEAGE_EXTRACTORS"))
        # skip either when it's empty string or None
        if env_extractors:
            self.log.info(f"All extractors: {env_extractors}")
            for extractor in env_extractors.split(";"):
                self.log.info(f"extractor:{extractor}")
                try:
                    self.log.info(f"PATH = '{os.getenv('PATH')}'")
                    self.log.info(os.listdir('/opt/airflow/plugins/extractors/'))
                except FileNotFoundError:
                    self.log.error('Extractors directory does not exist.')
                extractor: type[BaseExtractor] = try_import_from_string(extractor.strip())

                for operator_class in extractor.get_operator_classnames():
                    if operator_class in self.extractors:
                        self.log.debug(
                            "Duplicate extractor found for `%s`. `%s` will be used instead of `%s`",
                            operator_class,
                            extractor,
                            self.extractors[operator_class],
                        )
                    self.extractors[operator_class] = extractor

What you think should happen instead

The Airflow triggerer and the scheduler should also be able to import the Custom extractor class like the Airflow init container did, and successfully load the openlineage plugin.

How to reproduce

  1. Create the project folder with a simple DAG in the dags/ folder.
  2. Write a custom extractor <project_root>/plugins/extractors/some_ilneage_extractor.py
    
    from airflow.providers.openlineage.extractors import BaseExtractor, OperatorLineage
    from openlineage.client.run import Dataset

class MyExtractor(BaseExtractor):

@classmethod
def get_operator_classnames(cls):
    return ['PythonOperator']

def extract(self) -> OperatorLineage:
    return OperatorLineage(inputs=[Dataset(namespace=f"s3a://{self.operator.input_bucket}", name=source)
                                   for source in sorted(self.operator.resolved_source_objects)],
                           outputs=[Dataset(namespace=f"s3a://{self.operator.output_bucket}", name=source)
                                    for source in sorted(self.operator.resolved_destination_objects)])

3. Create a `Dockerfile` at project root and create update the `docker-compose.yaml` file as mentioned in the "Deployment Details" section above.
4. Run `docker-compose build && docker-compose up` to see the errors in the log.

### Anything else
I verified that the extractor class files are present in the containers by adding custom logs to the manager class. Relevant logs are added in the "What Happened" section. 
Also verified that the default lineage metrics are submitted to the Lineage backed (Marquez).

### Are you willing to submit PR?

- [ ] Yes I am willing to submit a PR!

### Code of Conduct

- [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
boring-cyborg[bot] commented 7 months ago

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

rylativity commented 6 months ago

I am running into an issue with the openlineage plugin too, however it is occuring when I try to install the Datahub plugin (acryl-datahub-airflow-plugin[plugin-v2]) to extract lineage as described here.

I am using Airflow 2.8.4 directly from the quickstart docker-compose.yml file. Didn't have this issue in Airflow 2.5.x-2.7.x.

WARN[0000] The "AIRFLOW_UID" variable is not set. Defaulting to a blank string. 
WARN[0000] The "AIRFLOW_UID" variable is not set. Defaulting to a blank string. 
airflow-scheduler-1  | 
airflow-scheduler-1  | BACKEND=redis
airflow-scheduler-1  | DB_HOST=redis
airflow-scheduler-1  | DB_PORT=6379
airflow-scheduler-1  | 
airflow-scheduler-1  |   ____________       _____________
airflow-scheduler-1  |  ____    |__( )_________  __/__  /________      __
airflow-scheduler-1  | ____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
airflow-scheduler-1  | ___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
airflow-scheduler-1  |  _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
airflow-scheduler-1  | [2024-04-06T21:15:29.903+0000] {plugins_manager.py:247} ERROR - Failed to import plugin openlineage
airflow-scheduler-1  | Traceback (most recent call last):
airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/plugins_manager.py", line 239, in load_entrypoint_plugins
airflow-scheduler-1  |     plugin_class = entry_point.load()
airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.10/site-packages/importlib_metadata/__init__.py", line 211, in load
airflow-scheduler-1  |     module = import_module(match.group('module'))
airflow-scheduler-1  |   File "/usr/local/lib/python3.10/importlib/__init__.py", line 126, in import_module
airflow-scheduler-1  |     return _bootstrap._gcd_import(name[level:], package, level)
airflow-scheduler-1  |   File "<frozen importlib._bootstrap>", line 1050, in _gcd_import
airflow-scheduler-1  |   File "<frozen importlib._bootstrap>", line 1027, in _find_and_load
airflow-scheduler-1  |   File "<frozen importlib._bootstrap>", line 1006, in _find_and_load_unlocked
airflow-scheduler-1  |   File "<frozen importlib._bootstrap>", line 688, in _load_unlocked
airflow-scheduler-1  |   File "<frozen importlib._bootstrap_external>", line 883, in exec_module
airflow-scheduler-1  |   File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/openlineage/plugins/openlineage.py", line 23, in <module>
airflow-scheduler-1  |     from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/openlineage/plugins/listener.py", line 28, in <module>
airflow-scheduler-1  |     from airflow.providers.openlineage.plugins.adapter import OpenLineageAdapter, RunState
airflow-scheduler-1  |   File "/home/airflow/.local/lib/python3.10/site-packages/airflow/providers/openlineage/plugins/adapter.py", line 26, in <module>
airflow-scheduler-1  |     from openlineage.client.facet import (
airflow-scheduler-1  | ImportError: cannot import name 'JobTypeJobFacet' from 'openlineage.client.facet' (/home/airflow/.local/lib/python3.10/site-packages/openlineage/client/facet.py)