apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.47k stars 14.13k forks source link

TimeSensor does not trigger #73

Closed tkaymak closed 9 years ago

tkaymak commented 9 years ago

We are trying to kick off our pipeline by a TimeSensor, but it does not trigger. There is no error in the logs visible - the graph looks like this: Screenshot

And the code like this:


from airflow import DAG
from airflow.operators import DummyOperator, MySqlOperator, BashOperator
from datetime import datetime, time, timedelta
from airflow.models import Variable
from airflow.operators.sensors import TimeSensor
from airflow.operators.sensors import SqlSensor

default_args = {
    'owner': 'deploy',
    'depends_on_past': False,
    'start_date': datetime(2015,06,24),
    'queue': 'default',
}

# the DAG
dag = DAG('pvi', default_args=default_args)

# tables
app_installs = SqlSensor(conn_id="exasol", sql="""SELECT MAX(CREATED_TS) FROM PVI.APP_INSTALLS WHERE CREATED_DATE='{{ ds }}'""", task_id='table_app_installs', dag=dag)
app_logins = SqlSensor(conn_id="exasol", sql="""SELECT MAX(CREATED_TS) FROM PVI.APP_LOGINS WHERE CREATED_DATE='{{ ds }}'""", task_id='table_app_logins', dag=dag)

# time dependencies
midnight_sensor = TimeSensor(target_time=time(15, 28), task_id='midnight', dag=dag)
start_aggregation = DummyOperator(task_id='start_aggregation', dag=dag)
start_aggregation.set_upstream(midnight_sensor)
start_aggregation.set_downstream([app_logins, app_installs])

We tried to trigger the chain by putting the target_time onto something 5 minutes in the future, the DAG gets refreshed every 60 seconds (default) but nothing happens. Worker, Scheduler, Flower and Webserver are on the same machine running Python 2.7 and FreeBSD 9.3 and we have a second machine running a celery worker which is listening on a different queue (q2). We have another DAG that works as expected on q2.

artwr commented 9 years ago

Hi Tobias, What is the output of airflow test pvi midnight_sensor 2015-06-24 ? You can usually see what the sensor does returned to stdout that way. Best, Arthur

tkaymak commented 9 years ago
2015-06-24 16:48:20,515 - root - INFO - Filling up the DagBag from /home/deploy/bit.flow/dags
2015-06-24 16:48:20,516 - root - INFO - Importing /home/deploy/bit.flow/dags/dev.py
2015-06-24 16:48:20,881 - root - INFO - Loaded DAG <DAG: dev>
2015-06-24 16:48:20,881 - root - INFO - Importing /home/deploy/bit.flow/dags/email_reporting.py
2015-06-24 16:48:20,900 - root - INFO - Loaded DAG <DAG: email_reporting>
2015-06-24 16:48:20,900 - root - INFO - Importing /home/deploy/bit.flow/dags/pvi.py
2015-06-24 16:48:20,938 - root - INFO - Loaded DAG <DAG: pvi>
2015-06-24 16:48:20,938 - root - INFO - Importing /home/deploy/bit.flow/dags/tasks.py
2015-06-24 16:48:21,034 - root - INFO - Loaded DAG <DAG: tasks>
2015-06-24 16:48:21,048 - root - INFO -
--------------------------------------------------------------------------------
New run starting @2015-06-24T16:48:21.047843
--------------------------------------------------------------------------------
2015-06-24 16:48:21,048 - root - INFO - Executing <Task(TimeSensor): midnight> for 2015-06-24 00:00:00
2015-06-24 16:48:21,049 - root - INFO - Checking if the time (15:28:00) has come
2015-06-24 16:48:21,049 - root - INFO - Success criteria met. Exiting.
tkaymak commented 9 years ago

Hi Arthur, I also checked the logs for a clock drift, but they are absolutely in sync. I don't know how to debug any further. Best, Tobias

mistercrunch commented 9 years ago

Do you have an airflow scheduler running?

tkaymak commented 9 years ago

Yes:

[deploy@airflow ~] ps auxwww | grep scheduler
deploy 26073  0.0  0.0  16316  1912 10  R+J   5:35PM 0:00.00 grep scheduler
deploy 25719  0.0  0.2 214320 62616 12  I+J   5:34PM 0:00.83 python: /usr/local/bin/python /usr/local/bin/airflow scheduler (python2.7)
tkaymak commented 9 years ago

Do I have to activate the tasks somehow? When I click on any of the 'Task Instances' view in the DAG, I get a "Not Found" error

Not Found

The tornado webserver log:

2015-06-24 17:44:23,096 - tornado.access - INFO - 200 GET /admin/airflow/graph?dag_id=pvi (10.5.124.14) 58.32ms
2015-06-24 17:44:28,110 - tornado.access - WARNING - 404 GET /admin/taskinstanceview/?flt1_dag_id_equals=pvi&flt2_task_id_equals=midnight&sort=3&desc=1 (10.5.124.14) 1.66ms
mistercrunch commented 9 years ago

Oh the first problem is your start_date. The airflow scheduler triggers the schedule t after t + dag.schedule_interval. 2015-06-24 triggers >=2015-06-25.

The second problem idk, but if you run your web server in debug mode you can get a stack trace for me airflow webserver -d

tkaymak commented 9 years ago

Thank you so much! First (and major problem) is solved! I checked the code, but I simply overlooked that! Regarding debug mode, when I try to start the webserver with the -d flag I get a socket.error: [Errno 49] Can't assign requested address - starting it without it works flawlessly. Assigning another port via airflow webserver -d -p 8050 results in the same error.

2015-06-24 18:59:18,153 - root - INFO - Filling up the DagBag from /home/deploy/bit.flow/dags
2015-06-24 18:59:18,182 - root - INFO - Importing /home/deploy/bit.flow/dags/pvi.py
2015-06-24 18:59:18,197 - root - INFO - Loaded DAG <DAG: pvi>
2015-06-24 18:59:18,197 - root - INFO - Importing /home/deploy/bit.flow/dags/tasks.py
2015-06-24 18:59:18,238 - root - INFO - Loaded DAG <DAG: tasks>
Starting the web server on port 8050 and host 10.5.1.108.
2015-06-24 18:59:18,309 - werkzeug - INFO -  * Running on http://10.5.1.108:8050/ (Press CTRL+C to quit)
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.1.1', 'airflow')
  File "/usr/local/lib/python2.7/site-packages/pkg_resources/__init__.py", line 735, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/usr/local/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1652, in run_script
    exec(code, namespace, namespace)
  File "/usr/local/lib/python2.7/site-packages/airflow-1.1.1-py2.7.egg/EGG-INFO/scripts/airflow", line 10, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow-1.1.1-py2.7.egg/airflow/bin/cli.py", line 241, in webserver
    app.run(debug=True, port=args.port, host=args.hostname)
  File "/usr/local/lib/python2.7/site-packages/flask/app.py", line 772, in run
    run_simple(host, port, self, **options)
  File "/usr/local/lib/python2.7/site-packages/werkzeug/serving.py", line 618, in run_simple
    test_socket.bind((hostname, port))
  File "/usr/local/lib/python2.7/socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
socket.error: [Errno 49] Can't assign requested address
artwr commented 9 years ago

Do you get the same issue by using airflow webserver --debug -hn 0.0.0.0 -p 8050?

artwr commented 9 years ago

It looks like you might have something else tied to that port + address

tkaymak commented 9 years ago

@artwr Yes, I really don't want to sound like an idiot, but I cannot spot any process taking up this port in sockstat (before starting the webserver):

[deploy@consumer-jail-04 ~] sockstat -4 -l
USER     COMMAND    PID   FD PROTO  LOCAL ADDRESS         FOREIGN ADDRESS
deploy   python2.7  8752  3  tcp4   10.5.1.36:8793        *:*
deploy   python2.7  5854  7  tcp4   10.5.1.36:8383        *:*
root     sshd       95192 3  tcp4   10.5.1.36:22          *:*
nagios   nrpe2      95149 4  tcp4   10.5.1.36:5666        *:*
root     monit      95125 5  tcp4   10.5.1.36:2812        *:*
root     syslogd    95100 6  udp4   10.5.1.36:514         *:*

However, running airflow webserver --debug -hn 0.0.0.0 -p 8050 worked:

[deploy@host ~] airflow webserver --debug -hn 0.0.0.0 -p 8050

2015-06-24 19:53:48,237 - root - INFO - Filling up the DagBag from /home/deploy/bit.flow/dags
2015-06-24 19:53:48,266 - root - INFO - Importing /home/deploy/bit.flow/dags/pvi.py
2015-06-24 19:53:48,281 - root - INFO - Loaded DAG <DAG: pvi>
2015-06-24 19:53:48,281 - root - INFO - Importing /home/deploy/bit.flow/dags/tasks.py
2015-06-24 19:53:48,323 - root - INFO - Loaded DAG <DAG: tasks>
Starting the web server on port 8050 and host 0.0.0.0.
2015-06-24 19:53:48,402 - werkzeug - INFO -  * Running on http://0.0.0.0:8050/ (Press CTRL+C to quit)
2015-06-24 19:53:48,405 - werkzeug - INFO -  * Restarting with stat

2015-06-24 19:53:49,035 - root - INFO - Filling up the DagBag from /home/deploy/bit.flow/dags
2015-06-24 19:53:49,130 - root - INFO - Importing /home/deploy/bit.flow/dags/pvi.py
2015-06-24 19:53:49,170 - root - INFO - Loaded DAG <DAG: pvi>
2015-06-24 19:53:49,170 - root - INFO - Importing /home/deploy/bit.flow/dags/tasks.py
2015-06-24 19:53:49,297 - root - INFO - Loaded DAG <DAG: tasks>
Starting the web server on port 8050 and host 0.0.0.0.
2015-06-24 19:53:57,225 - werkzeug - INFO - 10.5.124.14 - - [24/Jun/2015 19:53:57] "GET /admin/taskinstanceview/?flt1_dag_id_equals=pvi&flt2_task_id_equals=midnight&sort=3&desc=1 HTTP/1.1" 404 -
2015-06-24 19:54:08,223 - werkzeug - INFO - 10.5.124.14 - - [24/Jun/2015 19:54:08] "GET /admin/taskinstanceview/?flt1_dag_id_equals=pvi&flt2_task_id_equals=midnight&sort=3&desc=1 HTTP/1.1" 404 -
artwr commented 9 years ago

Maybe it's a connected socket? sockstat -c ? It looks like the IP attached to the machine is 10.5.1.36, maybe trying to tie it to 10.5.1.108 is the issue? I am really more of newbie as far as FreeBSD and sockets :confused: .

tkaymak commented 9 years ago

@artwr good catch, thank you! I was able to get a stack trace, the error is the same for all of my DAGs, even the simplest one that only prints out something using a BashOperator is throwing the 404:

2015-06-25 08:00:48,072 - root - INFO - Importing /home/deploy/bit.flow/dags/dev.py
2015-06-25 08:00:48,072 - root - ERROR - Failed to import: /home/deploy/bit.flow/dags/dev.py
2015-06-25 08:00:48,072 - root - ERROR -
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/site-packages/airflow-1.1.1-py2.7.egg/airflow/models.py", line 159, in process_file
    with utils.timeout(30):
  File "/usr/local/lib/python2.7/site-packages/airflow-1.1.1-py2.7.egg/airflow/utils.py", line 429, in __enter__
    signal.signal(signal.SIGALRM, self.handle_timeout)
ValueError: signal only works in main thread
2015-06-25 08:00:48,109 - werkzeug - INFO - 185.74.12.4 - - [25/Jun/2015 08:00:48] "GET /admin/airflow/code?dag_id=dev HTTP/1.1" 200 -
2015-06-25 08:00:48,146 - werkzeug - INFO - 185.74.12.4 - - [25/Jun/2015 08:00:48] "GET /static/d3.v3.min.js HTTP/1.1" 304 -
2015-06-25 08:00:48,148 - werkzeug - INFO - 185.74.12.4 - - [25/Jun/2015 08:00:48] "GET /static/dagre-d3.js HTTP/1.1" 304 -
2015-06-25 08:00:48,150 - werkzeug - INFO - 185.74.12.4 - - [25/Jun/2015 08:00:48] "GET /admin/static/vendor/bootstrap-daterangepicker/daterangepicker.js HTTP/1.1" 304 -
2015-06-25 08:00:48,152 - werkzeug - INFO - 185.74.12.4 - - [25/Jun/2015 08:00:48] "GET /admin/static/admin/js/form-1.0.0.js HTTP/1.1" 304 -
2015-06-25 08:00:49,262 - werkzeug - INFO - 185.74.12.4 - - [25/Jun/2015 08:00:49] "GET /admin/static/bootstrap/bootstrap3/fonts/glyphicons-halflings-regular.woff HTTP/1.1" 304 -
tkaymak commented 9 years ago

The graph is working now. The start_date explanation really helped, thanks! I will dig into the code to find out why I get the 404 for every DAG "Task Instance" view.

tkaymak commented 9 years ago

I will open a new issue regarding this as soon as I have more information about it. Thank you!