puckel / docker-airflow

Docker Apache Airflow
Apache License 2.0
3.78k stars 543 forks source link

Export connections so they can be shared #323

Open bryan3189 opened 5 years ago

bryan3189 commented 5 years ago

Currently every time you spin up a local executor all your connections are reset. Is there a way to specify connections in a file and then use that so Airflow has these connections by default.

zhongjiajie commented 5 years ago

@bryan3189 I use DAG to hold and create/overwrite connection. I will get a example tomorrow because code in my company computer. If I forget you could just remind after 24 hours when I post comment.

zhongjiajie commented 5 years ago

@bryan3189 I have to file, one is conf file named conf.py, another is DAG file named init_conn_var.py

conf.py

var = {
    'connections': [
        {
            'conn_id': 'ssh_my_own_1',
            'conn_type': 'ssh',
            'host': '127.0.0.2',
            'port': 22,
            'login': 'root',
            'password': 'pwd',
        },
        {
            'conn_id': 'ssh_my_own_2',
            'conn_type': 'ssh',
            'host': '127.0.0.3',
            'port': 22,
            'login': 'root',
            'password': 'pwd',
        },
        ...
    ]
}

init_conn_var.py

from airflow import DAG, Connection
from airflow.setting import Session
from airflow.operators.python_operator import PythonOperator

def crt_airflow_conn(conf):
    conn = Connection()
    conn.conn_id = conf.get('conn_id')
    conn.conn_type = conf.get('conn_type')
    conn.host = conf.get('host')
    conn.port = conf.get('port')
    conn.login = conf.get('login')
    conn.password = conf.get('password')
    conn.schema = conf.get('schema')
    conn.extra = conf.get('extra')

    session = Session()
    try:
        exists_conn = session.query(Connection.conn_id == conn.conn_id).one()
    except exc.NoResultFound:
        logging.info('connection not exists, will create it.')
    else:
        logging.info('connection exists, will delete it before create.')
        session.delete(exists_conn)
    finally:
        session.add(conn)
        session.commit()
    session.close()

dag = DAG(
    dag_id='create_conn',
    schedule_interval='@once',
)

for connection in conf.get('connection'):
    crt_conn = PythonOperator(
        task_id='create_conn_{}'.format(connection.get('conn_id')),
        pyhton_callable=crt_airflow_conn,
        op_kwargs={'conf': connection},
        provide_context=False,
        dag=dag,
    )

This just a part of my configure file. you could create more things. like airflow.medels's Connection Variable and Pool

bryan3189 commented 5 years ago

Thank you @zhongjiajie! This is helpful!

zhongjiajie commented 5 years ago

@bryan3189 You are welcome.

pratikhonrao commented 5 years ago

hello @zhongjiajie . I am also trying to achieve something similar, however I am getting this error Broken DAG: [/usr/local/airflow/dags/init_conn_var.py] cannot import name 'Connection'

Any idea, why is this happening?

zhongjiajie commented 5 years ago

@pratikhonrao Maybe you should change you import statement to

from airflow.models.connection import Connection
pratikhonrao commented 5 years ago

Thanks @zhongjiajie I tried few things and it worked. The import statements which worked for me are --

import airflow
from airflow import DAG
from airflow.models import Connection
from airflow import (models, settings)
from airflow.operators.python_operator import PythonOperator

however now, I am trying to solve this error -- ~ Broken DAG: [/usr/local/airflow/dags/init_conn_var.py] name 'conf' is not defined

and I am also unable to import session If I find a solution I will share it here

zhongjiajie commented 5 years ago

@pratikhonrao Could you show the code you wrote? If with filename and file structure will be better.

zhongjiajie commented 5 years ago

@pratikhonrao FYI the reason from airflow import Connection not work it's that Airflow refactor airflow.models code and change it import path

pratikhonrao commented 5 years ago

my init_conn_var.py looks like this --

import conf
import airflow
from airflow import DAG
from airflow.models import Connection
from airflow import (models, settings)
from airflow.operators.python_operator import PythonOperator
#from airflow.utils import timezone
#from datetime import datetime,timedelta
#import pendulum

def crt_airflow_conn(conf):
    conn = Connection()
    conn.conn_id = conf.get('conn_id')
    conn.conn_type = conf.get('conn_type')
    conn.host = conf.get('host')
    conn.port = conf.get('port')
    #conn.login = conf.get('login')
    #conn.password = conf.get('password')
    #conn.schema = conf.get('schema')
    #conn.extra = conf.get('extra')

    session = Session()
    try:
        exists_conn = session.query(Connection.conn_id == conn.conn_id).one()
    except exc.NoResultFound:
        logging.info('connection not exists, will create it.')
    else:
        logging.info('connection exists, will delete it before create.')
        session.delete(exists_conn)
    finally:
        session.add(conn)
        session.commit()
    session.close()

dag = DAG(
    dag_id='create_conn1',
    schedule_interval='@once',
)

for connection in conf.get('connection'):
    crt_conn = PythonOperator(
        task_id='create_conn_{}'.format(connection.get('conn_id')),
        pyhton_callable=crt_airflow_conn,
        op_kwargs={'conf': connection},
        provide_context=False,
        dag=dag,
    )

and my conf.py looks like this --

var = {
    'connections': [
        {
            'conn_id': 'http_waterfall',
            'conn_type': 'http',
            'host': 'http://aws.xxx..com',
            'port': 8014
        },
        {
            'conn_id': 'ssh_my_own_2',
            'conn_type': 'ssh',
            'host': '127.0.0.3',
            'port': 22,
            'login': 'root',
            'password': 'pwd',
        }
   ]
}

and I am getting below error - Broken DAG: [/usr/local/airflow/dags/init_conn_var.py] module 'conf' has no attribute 'get'

I haven't done any much changes to your code.. Only minimal changes done, to see if its working.

zhongjiajie commented 5 years ago

@pratikhonrao I am sorry, I just write some demo code, my env script is more complex. But maybe we could still make it work.

In init_conn_var.py we should change

from conf import var     <-- HERE
import airflow
from airflow import DAG
from airflow.models import Connection
from airflow import (models, settings)
from airflow.operators.python_operator import PythonOperator
#from airflow.utils import timezone
#from datetime import datetime,timedelta
#import pendulum

.... 
for connection in var.get('connections'):      <-- HERE
    crt_conn = PythonOperator(
        task_id='create_conn_{}'.format(connection.get('conn_id')),
        pyhton_callable=crt_airflow_conn,
        op_kwargs={'conf': connection},
        provide_context=False,
        dag=dag,
    )

I think this will fix it up.

pratikhonrao commented 5 years ago

Oh, thanks for that quick response @zhongjiajie yes it did fix the code !

However, I have ran into another error there - - Broken DAG: [/usr/local/airflow/dags/init_conn_var.py] 'NoneType' object is not iterable .

Although we do have data present in conf.py file. I am investigating it, do you have nay quick suggestions or direction where I can look ?

zhongjiajie commented 5 years ago

@pratikhonrao I think maybe it due use wrong dict key

for connection in var.get('connections'):   # know that key "connections" should same as you define in init_conn_var.py

you should debug on it , check if var.get('connections') return None

For more information, you could take a look at here section Debugging an Airflow operator to find out how to debug in Airflow

pratikhonrao commented 5 years ago

Hello @zhongjiajie , I debugged it and its creating connections now --

Below are some updated chunks, which worked for me Airfllow Version : 1.10.2 :

from conf import var
import airflow
from airflow import DAG
from airflow.models import Connection
from airflow import (models, settings)
from airflow.operators.python_operator import PythonOperator
 ...
 session = settings.Session()     <-- HERE
...
for connection in var.get('connections'):       <- HERE
   crt_conn = PythonOperator(
               task_id='create_conn_{}'.format(connection.get('conn_id')),
               python_callable=crt_airflow_conn,
               op_kwargs={'conf': connection},
               provide_context=False,
               dag=dag,
    )

Connections are getting created for now, however code is not able to check whether a connection already exists, and created a new connection irrespective of one connection already present

zhongjiajie commented 5 years ago

@pratikhonrao Yup, I think if we create connection in dag and configure file, just like init_conn_var.py, and the configure should keep it single-way to keep Airflow configure and DAG configure are some values. So I put some codo in here

    session = Session()
    # if connection id exists will delete first
    # if connection id not exists will do nothing
    # but whether exists or not will create a new one in finally statement
    try:
        exists_conn = session.query(Connection.conn_id == conn.conn_id).one()
    except exc.NoResultFound:
        logging.info('connection not exists, will create it.')
    else:
        logging.info('connection exists, will delete it before create.')
        session.delete(exists_conn)
    finally:
        session.add(conn)
        session.commit()
    session.close()

If you don't agree with that you should change the code and do what you expect.

silverwolfx10 commented 5 years ago

Hello @zhongjiajie , I debugged it and its creating connections now --

Below are some updated chunks, which worked for me Airfllow Version : 1.10.2 :

from conf import var
import airflow
from airflow import DAG
from airflow.models import Connection
from airflow import (models, settings)
from airflow.operators.python_operator import PythonOperator
 ...
 session = settings.Session()     <-- HERE
...
for connection in var.get('connections'):       <- HERE
   crt_conn = PythonOperator(
               task_id='create_conn_{}'.format(connection.get('conn_id')),
               python_callable=crt_airflow_conn,
               op_kwargs={'conf': connection},
               provide_context=False,
               dag=dag,
    )

Connections are getting created for now, however code is not able to check whether a connection already exists, and created a new connection irrespective of one connection already present

Change this line:

exists_conn = session.query(Connection).filter(Connection.conn_id == conn.conn_id).one()

And will work properly.

naveedn commented 3 years ago

thank you @zhongjiajie for a smart solution! I was able to get the code working for my project with the following code. I am running airflow 1.10.9

from airflow.models import Connection
from airflow import (models, settings)

def create_airflow_conn(conf):
    conn = Connection()
    conn.conn_id = conf.get('conn_id')
    conn.conn_type = conf.get('conn_type')
    conn.host = conf.get('host')
    conn.port = conf.get('port')
    conn.login = conf.get('login')
    conn.password = conf.get('password')
    conn.schema = conf.get('schema')
    conn.extra = conf.get('extra')

    session = settings.Session()
    try:
        existing_conns = session.query(Connection).filter(
            Connection.conn_id == conn.conn_id).delete()
    finally:
        session.add(conn)
        session.commit()
    session.close()