damavis / airflow-pentaho-plugin

Pentaho plugin for Apache Airflow - Orquestate pentaho transformations and jobs from Airflow
Apache License 2.0
38 stars 17 forks source link

Airflow 1.10 and this plugin #19

Closed sanderpukk closed 1 year ago

sanderpukk commented 1 year ago

Hi!

I (for now) have to use old Airflow 1.10. Getting an error that I have found little information about and little info even in source code.

[2022-09-27 18:20:26,623] {taskinstance.py:1150} ERROR - __init__() missing 1 required positional argument: 'source' Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/operators/carte.py", line 97, in execute conn = self._get_pentaho_carte_client() File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/operators/carte.py", line 90, in _get_pentaho_carte_client return PentahoCarteHook(conn_id=self.pdi_conn_id, File "/usr/local/lib/python3.8/site-packages/airflow_pentaho/hooks/carte.py", line 167, in __init__ super().__init__() TypeError: __init__() missing 1 required positional argument: 'source'

The thing is, the DAG works on my own machine (2.0 and own hosted carte), but not on the server where it is 1.10. The airflow connections are all setup up with the extra parameters,

Is the plugin not compatible with older Airflow anymore?

The DAG:

`work_params = { "pentaho_db_user": pentaho_dwh_db_user, "pentaho_dwh_db_pass": pentaho_dwh_db_pass, "pentaho_ods_db_pass": pentaho_ods_db_pass, "pentaho_dwh_host": db_dwh_conn_params.get("db_ip"), "pentaho_dwh_port": db_dwh_conn_params.get("db_port"), "pentaho_dwh_db_name": db_dwh_conn_params.get("db_name"), "pentaho_ods_host": db_ods_conn_params.get("db_ip"), "pentaho_ods_port": db_ods_conn_params.get("db_port"), "pentaho_ods_db_name": db_ods_conn_params.get("db_name"), "pentaho_job_path": pentaho_job_path, "insert_sql_path" : insert_sql_path
}

DAG_NAME = "eelis_pdi_dwh_f_lkoht" DEFAULT_ARGS = { 'owner': 'me', }

with DAG(dag_id=DAG_NAME, default_args=DEFAULT_ARGS, catchup=False, start_date=days_ago(1), schedule_interval='15 3 *') as dag:

# Define the task using the CarteJobOperator
insert_into_lkoht = CarteJobOperator(
    task_id="job_testing1",
    job=pentaho_job_path,
    params= work_params, 
    dag=dag)

# ... #

insert_into_lkoht`

In airflow I can see that all the parameters etc are there and given. Not empty and finds the Pentaho job path as well. Pip install is the newest 1.0.10 version too or should be. (some traces in older source code)

sanderpukk commented 1 year ago

I really thank you :) ! Huge thanks!