damavis / airflow-pentaho-plugin

Pentaho plugin for Apache Airflow - Orquestate pentaho transformations and jobs from Airflow
Apache License 2.0
39 stars 17 forks source link

Amazon Managed Workflows for Apache Airflow - Custom Plugin Issue #4

Closed erincme closed 3 years ago

erincme commented 3 years ago

Hello,

We just want to use airflow-pentaho plugin with the Amazon Managed Workflows for Apache Airflow. But as we loaded plug-in as custom to the enviroment the following error appears on UI. Only available version of Airflow on AWS is 1.10.12 right now.

Is there a possiblity to use this plug-in in AWS? With requirements file or custom plug-in file?

"Broken DAG: [/usr/local/airflow/dags/dag_test.py] No module named 'airflow_pentaho.hooks'; 'airflow_pentaho' is not a package"

Thanks in advance..

Erinc

piffall commented 3 years ago

Hi @erincme

I suppose you should use it without problem installing it with the requirements file. Could you copy/paste your dag_test.py file?

Thnx

erincme commented 3 years ago

This is dag file that we are trying to run.

requirements.txt

dag_test.zip

piffall commented 3 years ago

Ok, Could you try to import CarteJobOperator like this:

from airflow.operators.pentaho import CarteTransOperator
piffall commented 3 years ago

I'm sorry @erincme I've just see what it's happening. Let me check if its possible to publish a backported version for apache-airflow 1.10.x.

piffall commented 3 years ago

Hi @erincme

I've just released the version 1.0.6, that fixes an issue about 1.10.x compatibility. So please, change it on your requirements file.

Even though, I've found other errors in dag_test.zip DAGs.

Please, check if it's working with 1.0.6 version. I'll wait for any feedback before closing the issue.

Thanks.

erincme commented 3 years ago

Hi @piffall,

I have tried the new version with the managed airflow but the same error continues to occur. I think the managed version of airflow somehow doesn't support this custom plugin. By the way I have corrected the errors that you mentioned.

We are going to try on-prem airflow by the docker version.

Thanks,

Broken DAG: [/usr/local/airflow/dags/dag_def.py] No module named 'airflow.operators.CarteJobOperator'

piffall commented 3 years ago

Hi @erincme ,

It seems that the import is not OK, It should be:

from airflow.operators.airflow_pentaho import CarteJobOperator

Thanks

erincme commented 3 years ago

Actually I have tried lots of combinations in import line, but it doesn't find custom plugin folder. It is somehow seems like related to the platform configuration.

I am also sending the link of custom plugin install.

https://docs.aws.amazon.com/mwaa/latest/userguide/configuring-dag-import-plugins.html

image

Best,

piffall commented 3 years ago

Hi @erincme

It seems that the plugin is not installed. Please check that your requirements.txt is installing other deps.

I've tested on-prem, and it works:

image

DAG code:

from airflow import DAG
from datetime import datetime, timedelta

from airflow.utils.dates import days_ago

from airflow_pentaho.operators.carte import CarteJobOperator
from airflow.operators.python_operator import PythonOperator

DAG_NAME = "pdi_test"
DEFAULT_ARGS = {
    "owner": "erinc",
    "depends_on_past": False,
    "email": ["*****"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
    "start_date": days_ago(1) 
}

with DAG(
    dag_id="test",
    description="test of airflow* PDI integration",
    schedule_interval="0 19 * * *",
    default_args=DEFAULT_ARGS,
    catchup=False,
    on_failure_callback=lambda: print("there was an error.."),
    on_success_callback=lambda: print("DAG ran successfully..")
    ) as dag:

    # airflow_test1 job.
    job1 = CarteJobOperator(
        dag=dag,
        task_id="job1",
        job="",
    )

    # airflow_test2 job.
    job2 = CarteJobOperator(
        dag=dag,
        task_id="job2",
        job="",
    )

    # Wait for 2 mins after job1. Then run job2.
    delay_python_task = PythonOperator(
        task_id="delay_python_task",
        dag=dag,
        python_callable=lambda: time.sleep(600)
    )

    job1 >> delay_python_task >> job2
erincme commented 3 years ago

Hi @piffall,

We finally managed to install plugin in managed airflow on AWS. This is the requirements file and sample dag we use. Maybe it will help someone who tries same thing.

thanks for your help

airflow_pentaho.zip requirements.txt

dag code;

from airflow import DAG
from datetime import datetime, timedelta
from airflow.utils.dates import days_ago

from airflow_pentaho.operators.CarteJobOperator import CarteJobOperator
from airflow.operators.python_operator import PythonOperator

DAG_NAME = "pdi_test"
DEFAULT_ARGS = {
    "owner": "erinc",
    "start_date": days_ago(2), 
    "depends_on_past": False,
    "email": ["test@test.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=10)
}
dag = DAG(
        dag_id="test",
        description="test of airflow* PDI integration",
        schedule_interval="0 19 * * *",
        default_args=DEFAULT_ARGS,
        catchup=False,
        on_failure_callback=lambda: print("there was an error.."),
        on_success_callback=lambda: print("DAG ran successfully..")
)
# airflow_test1 job.
job1 = CarteJobOperator(
dag=dag,
task_id="airflow_test1.kjb",
job=r"C:\Users\Administrator\Desktop\AIRFLOW_TEST",
)

# airflow_test2 job.
job2 = CarteJobOperator(
dag=dag,
task_id="airflow_test2.kjb",
job=r"C:\Users\Administrator\Desktop\AIRFLOW_TEST",
)

# Wait for 2 mins after job1. Then run job2.
delay_python_task = PythonOperator(
task_id="delay_python_task",
dag=dag,
python_callable=lambda: time.sleep(600)
)

[delay_python_task,job1] >> job2