puckel / docker-airflow

Docker Apache Airflow
Apache License 2.0
3.77k stars 542 forks source link

Airflow operator #543

Open PrateekGupta1819 opened 4 years ago

PrateekGupta1819 commented 4 years ago

How to use the airflow operator?

I am able to install the docker library using requirements.txt but the dag fails. Can somebody help? My guess is that the docker daemon is not running.

$ cat requirements.txt docker

docker run -d -p 8080:8080 -v $PWD/dags:/usr/local/airflow/dags -v $PWD/requirements.txt:/requirements.txt puckel/docker-airflow webserver

Dag details: dags/test.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator

default_args = {
        'owner'                 : 'airflow',
        'description'           : 'Use of the DockerOperator',
        'depend_on_past'        : False,
        'start_date'            : datetime(2018, 1, 3),
        'email_on_failure'      : False,
        'email_on_retry'        : False,
        'retries'               : 1,
        'retry_delay'           : timedelta(minutes=5)
}

with DAG('docker_dag', default_args=default_args, schedule_interval="5 * * * *", catchup=False) as dag:
        t1 = BashOperator(
                task_id='print_current_date',
                bash_command='date'
        )

        t2 = DockerOperator(
                task_id='docker_command',
                image='centos:latest',
                api_version='auto',
                auto_remove=True,
                command="/bin/sleep 30",
                docker_url="unix://var/run/docker.sock",
                network_mode="bridge"
        )

        t3 = BashOperator(
                task_id='print_hello',
                bash_command='echo "hello world"'
        )

        t1 >> t2 >> t3

Error:

*** Reading local file: /usr/local/airflow/logs/docker_dag/docker_command/2020-04-20T11:54:30.996363+00:00/1.log
[2020-04-20 11:54:49,337] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2020-04-20T11:54:30.996363+00:00 [queued]>
[2020-04-20 11:54:49,345] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2020-04-20T11:54:30.996363+00:00 [queued]>
[2020-04-20 11:54:49,345] {{taskinstance.py:866}} INFO - 
--------------------------------------------------------------------------------
[2020-04-20 11:54:49,345] {{taskinstance.py:867}} INFO - Starting attempt 1 of 2
[2020-04-20 11:54:49,345] {{taskinstance.py:868}} INFO - 
--------------------------------------------------------------------------------
[2020-04-20 11:54:49,352] {{taskinstance.py:887}} INFO - Executing <Task(DockerOperator): docker_command> on 2020-04-20T11:54:30.996363+00:00
[2020-04-20 11:54:49,354] {{standard_task_runner.py:53}} INFO - Started process 1170 to run task
[2020-04-20 11:54:49,394] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: docker_dag.docker_command 2020-04-20T11:54:30.996363+00:00 [running]> 24b22e2b7d4c
[2020-04-20 11:54:49,408] {{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
FileNotFoundError: [Errno 2] No such file or directory

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 720, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 400, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 734, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 207, in _retrieve_server_version
    return self.version(api_version=False)["ApiVersion"]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/daemon.py", line 181, in version
    return self._result(self._get(url), json=True)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 230, in _get
    return self.get(url, **self._set_request_timeout(kwargs))
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 546, in get
    return self.request('GET', url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 262, in execute
    tls=tls_config
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 190, in __init__
    self._version = self._retrieve_server_version()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 215, in _retrieve_server_version
    'Error while fetching server API version: {0}'.format(e)
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', FileNotFoundError(2, 'No such file or directory'))
[2020-04-20 11:54:49,410] {{taskinstance.py:1151}} INFO - Marking task as UP_FOR_RETRY
[2020-04-20 11:54:59,319] {{logging_mixin.py:112}} INFO - [2020-04-20 11:54:59,318] {{local_task_job.py:103}} INFO - Task exited with return code 1
wittfabian commented 4 years ago

The docker socket must also be mounted in the container.

PrateekGupta1819 commented 4 years ago

Hello. Thanks for replying. Did you mean this when you say mounting the docker socket? -v /var/run/docker.sock:/var/run/docker.sock

Complete command: docker run -d -p 8080:8080 -v $PWD/dags:/usr/local/airflow/dags -v $PWD/requirements.txt:/requirements.txt -v /var/run/docker.sock:/var/run/docker.sock puckel/docker-airflow webserver

wittfabian commented 4 years ago

Yes

PrateekGupta1819 commented 4 years ago

I am still getting permission error. I even did chmod to give all access to the docker.sock. sudo chmod 777 /var/run/docker.sock

Log:

*** Reading local file: /usr/local/airflow/logs/docker_dag/docker_command/2020-04-20T12:05:00+00:00/2.log
[2020-04-20 13:10:36,443] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2020-04-20T12:05:00+00:00 [queued]>
[2020-04-20 13:10:36,450] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: docker_dag.docker_command 2020-04-20T12:05:00+00:00 [queued]>
[2020-04-20 13:10:36,450] {{taskinstance.py:866}} INFO - 
--------------------------------------------------------------------------------
[2020-04-20 13:10:36,450] {{taskinstance.py:867}} INFO - Starting attempt 2 of 2
[2020-04-20 13:10:36,450] {{taskinstance.py:868}} INFO - 
--------------------------------------------------------------------------------
[2020-04-20 13:10:36,457] {{taskinstance.py:887}} INFO - Executing <Task(DockerOperator): docker_command> on 2020-04-20T12:05:00+00:00
[2020-04-20 13:10:36,459] {{standard_task_runner.py:53}} INFO - Started process 361 to run task
[2020-04-20 13:10:36,494] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: docker_dag.docker_command 2020-04-20T12:05:00+00:00 [running]> 504cad745253
[2020-04-20 13:10:36,507] {{taskinstance.py:1128}} ERROR - Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
PermissionError: [Errno 13] Permission denied

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 449, in send
    timeout=timeout
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 720, in urlopen
    method, url, error=e, _pool=self, _stacktrace=sys.exc_info()[2]
  File "/usr/local/lib/python3.7/site-packages/urllib3/util/retry.py", line 400, in increment
    raise six.reraise(type(error), error, _stacktrace)
  File "/usr/local/lib/python3.7/site-packages/urllib3/packages/six.py", line 734, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 672, in urlopen
    chunked=chunked,
  File "/usr/local/lib/python3.7/site-packages/urllib3/connectionpool.py", line 387, in _make_request
    conn.request(method, url, **httplib_request_kw)
  File "/usr/local/lib/python3.7/http/client.py", line 1252, in request
    self._send_request(method, url, body, headers, encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1298, in _send_request
    self.endheaders(body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1247, in endheaders
    self._send_output(message_body, encode_chunked=encode_chunked)
  File "/usr/local/lib/python3.7/http/client.py", line 1026, in _send_output
    self.send(msg)
  File "/usr/local/lib/python3.7/http/client.py", line 966, in send
    self.connect()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/transport/unixconn.py", line 43, in connect
    sock.connect(self.unix_socket)
urllib3.exceptions.ProtocolError: ('Connection aborted.', PermissionError(13, 'Permission denied'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 207, in _retrieve_server_version
    return self.version(api_version=False)["ApiVersion"]
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/daemon.py", line 181, in version
    return self._result(self._get(url), json=True)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/utils/decorators.py", line 46, in inner
    return f(self, *args, **kwargs)
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 230, in _get
    return self.get(url, **self._set_request_timeout(kwargs))
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 546, in get
    return self.request('GET', url, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 533, in request
    resp = self.send(prep, **send_kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/sessions.py", line 646, in send
    r = adapter.send(request, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/requests/adapters.py", line 498, in send
    raise ConnectionError(err, request=request)
requests.exceptions.ConnectionError: ('Connection aborted.', PermissionError(13, 'Permission denied'))

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.7/site-packages/airflow/operators/docker_operator.py", line 262, in execute
    tls=tls_config
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 190, in __init__
    self._version = self._retrieve_server_version()
  File "/usr/local/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 215, in _retrieve_server_version
    'Error while fetching server API version: {0}'.format(e)
docker.errors.DockerException: Error while fetching server API version: ('Connection aborted.', PermissionError(13, 'Permission denied'))
[2020-04-20 13:10:36,509] {{taskinstance.py:1170}} INFO - All retries failed; marking task as FAILED.dag_id=docker_dag, task_id=docker_command, execution_date=20200420T120500, start_date=20200420T131036, end_date=20200420T131036
[2020-04-20 13:10:46,447] {{logging_mixin.py:112}} INFO - [2020-04-20 13:10:46,446] {{local_task_job.py:103}} INFO - Task exited with return code 1
wittfabian commented 4 years ago

See https://stackoverflow.com/a/60092639 Maybe this will help.

PrateekGupta1819 commented 4 years ago

I think sudo chmod did not work. I dont have vim inside the container. So, I can see the config file but cant edit it.

airflow@f363c97765a4:~$ ls /var/run/ -pla total 12 drwxr-xr-x 1 root root 4096 Apr 20 14:48 ./ drwxr-xr-x 1 root root 4096 Apr 20 14:48 ../ srw-rw---- 1 root root 0 Apr 20 11:23 docker.sock drwxrwxrwt 2 root root 4096 Jan 30 00:00 lock/ -rw-rw-r-- 1 root utmp 0 Jan 30 00:00 utmp

wittfabian commented 4 years ago

No possibility to restart the Cotnainer?

PrateekGupta1819 commented 4 years ago

Got it working. Logged into container as root, changed the permission for docker.sock to 777. Now, airflow can execute the dockeroperator.

wittfabian commented 4 years ago

But this is not a permanent solution. Once the container restarts, the same problem occurs.

PrateekGupta1819 commented 4 years ago

Yes. container restart will have issues. I am not sure how this can be fixed from docker run. May be start with root, but that may have its own set of issues.

tamlyn commented 3 years ago

What worked best for me was to connect to the Docker engine over TCP instead of the file system socket as explained at https://github.com/docker/for-mac/issues/770#issuecomment-252560286

On the host spin up a little helper image:

docker run -d -v /var/run/docker.sock:/var/run/docker.sock -p 127.0.0.1:1234:1234 bobrik/socat TCP-LISTEN:1234,fork UNIX-CONNECT:/var/run/docker.sock

Then set up the DockerOperator to connect to docker_url="tcp://host.docker.internal:1234".

entechlog commented 3 years ago

Yes. container restart will have issues. I am not sure how this can be fixed from docker run. May be start with root, but that may have its own set of issues.

Did you manage to get a work around for this issue ?

eracle commented 3 years ago

Unfortunately is not possible to change permission on the Dockerfile, since the mount is performed via docker-compose file. So, when the Docker image is built the mounted file is not yet there. I solved it by modifying the permissions of the socket on my laptop, OUTSIDE the docker container. I ran: sudo chmod 666 /var/run/docker.sock

strange1elit commented 1 year ago

Dag Details dags/store_DAG.py

from airflow import DAG from airflow.operators.mysql_operator import MySqlOperator from datetime import datetime, timedelta

default_args = { 'owner': 'Airflow', 'start_date': datetime(2023, 1, 11), 'depends_on_past': False, 'retries': 1, 'retry_delay': timedelta(seconds=5) }

template_searchpath = [/usr/local/airflow/sql_files]

dag = DAG('store_dag', default_args=default_args, schedule_interval='@daily', catchup=False)

t1 = MySqlOperator(task_id='create_mysql_table',

mysql_conn_id='mysql_conn', sql='CREATE TABLE my_table (id INT, name VARCHAR(255);', dag=dag)

create_table = MySqlOperator( task_id='create_table', mysql_conn_id='mysql_conn', sql='CREATE TABLE my_table (id INT, name VARCHAR(255));', dag=dag, )

insert_values = MySqlOperator( task_id='insert_values', mysql_conn_id='mysql_conn', sql="INSERT INTO my_table (id, name) VALUES (1, 'John Doe'), (2, 'Jane Smith');", dag=dag, )

select_values = MySqlOperator( task_id='select_values', mysql_conn_id='mysql_conn', sql='SELECT * FROM my_table;', dag=dag, )

create_table >> insert_values >> select_values

strange1elit commented 1 year ago

Error:

*** Reading local file: /usr/local/airflow/logs/store_dag/create_table/2023-01-12T17:00:57.034504+00:00/2.log [2023-01-12 17:01:31,730] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: store_dag.create_table 2023-01-12T17:00:57.034504+00:00 [queued]> [2023-01-12 17:01:31,789] {{taskinstance.py:655}} INFO - Dependencies all met for <TaskInstance: store_dag.create_table 2023-01-12T17:00:57.034504+00:00 [queued]> [2023-01-12 17:01:31,789] {{taskinstance.py:866}} INFO -

[2023-01-12 17:01:31,790] {{taskinstance.py:867}} INFO - Starting attempt 2 of 2 [2023-01-12 17:01:31,790] {{taskinstance.py:868}} INFO -

[2023-01-12 17:01:31,832] {{taskinstance.py:887}} INFO - Executing <Task(MySqlOperator): create_table> on 2023-01-12T17:00:57.034504+00:00 [2023-01-12 17:01:31,840] {{standard_task_runner.py:53}} INFO - Started process 40963 to run task [2023-01-12 17:01:32,128] {{logging_mixin.py:112}} INFO - Running %s on host %s <TaskInstance: store_dag.create_table 2023-01-12T17:00:57.034504+00:00 [running]> 21c1316b7bdf [2023-01-12 17:01:32,201] {{mysql_operator.py:61}} INFO - Executing: CREATE TABLE my_table (id INT, name VARCHAR(255)); [2023-01-12 17:01:32,236] {{taskinstance.py:1128}} ERROR - Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 966, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python3.7/site-packages/airflow/operators/mysql_operator.py", line 67, in execute parameters=self.parameters) File "/usr/local/lib/python3.7/site-packages/airflow/hooks/dbapi_hook.py", line 162, in run with closing(self.get_conn()) as conn: File "/usr/local/lib/python3.7/site-packages/airflow/hooks/mysql_hook.py", line 72, in get_conn conn = self.get_connection(self.mysql_conn_id) File "/usr/local/lib/python3.7/site-packages/airflow/hooks/base_hook.py", line 84, in get_connection log.info("Using connection to: %s", conn.log_info()) File "/usr/local/lib/python3.7/site-packages/airflow/models/connection.py", line 320, in log_info "XXXXXXXX" if self.password else None, File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/attributes.py", line 353, in get retval = self.descriptor.get(instance, owner) File "/usr/local/lib/python3.7/site-packages/airflow/models/connection.py", line 190, in get_password return fernet.decrypt(bytes(self._password, 'utf-8')).decode() File "/usr/local/lib/python3.7/site-packages/cryptography/fernet.py", line 171, in decrypt raise InvalidToken cryptography.fernet.InvalidToken [2023-01-12 17:01:32,249] {{taskinstance.py:1170}} INFO - All retries failed; marking task as FAILED.dag_id=store_dag, task_id=create_table, execution_date=20230112T170057, start_date=20230112T170131, end_date=20230112T170132 [2023-01-12 17:01:41,699] {{logging_mixin.py:112}} INFO - [2023-01-12 17:01:41,697] {{local_task_job.py:103}} INFO - Task exited with return code 1

strange1elit commented 1 year ago

I am not able to create the table