apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.03k stars 14.28k forks source link

SubDag Operator broken when used with pools - infinite retry of tasks #1225

Closed syvineckruyk closed 8 years ago

syvineckruyk commented 8 years ago

Dear Airflow Maintainers,

Before I tell you about my issue, let me describe my environment:

Environment

Centos 6 CeleryExecutor (redis) Two workers, 1 scheduler, 1 webserver MySql

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators import BashOperator, SubDagOperator

dag_name = 'test_18'
test_pool = 'test_pool'
email_to = 'your@email.com'

now = datetime.now()
start_time = datetime(now.year, now.month, now.day, 12, 0, 0)
start_time = start_time + timedelta(days=-1)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    'email': [email_to],
    'email_on_failure': True,
    'email_on_retry': True,
    'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval='0 * * * *')

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 10',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 10 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=30),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
Id  Dttm    Dag Id  Task Id Event   Execution Date  Owner   Extra
1910    03-27T15:37:36  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1911    03-27T15:37:46  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1916    03-27T15:39:18  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1918    03-27T15:39:28  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1924    03-27T15:41:18  test_18.test_subdag test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1930    03-27T15:41:28  test_18.test_subdag test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1912    03-27T15:37:54  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1914    03-27T15:38:05  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1917    03-27T15:39:23  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1920    03-27T15:39:33  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1921    03-27T15:40:54  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1922    03-27T15:41:04  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1925    03-27T15:41:26  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1926    03-27T15:41:36  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1927    03-27T15:41:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1928    03-27T15:42:02  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1929    03-27T15:42:18  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1931    03-27T15:42:28  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1933    03-27T15:43:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1934    03-27T15:44:02  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1935    03-27T15:44:30  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1936    03-27T15:44:40  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1937    03-27T15:44:56  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1938    03-27T15:45:06  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1939    03-27T15:46:35  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1940    03-27T15:46:45  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1941    03-27T15:47:42  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1942    03-27T15:47:52  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
1943    03-27T15:49:20  test_18.test_subdag_with_pool   test_task_2_with_exit_1 running 03-26T12:00:00  Test    
1944    03-27T15:49:30  test_18.test_subdag_with_pool   test_task_2_with_exit_1 failed  03-26T12:00:00  Test    
...

Now that you know a little about me, let me tell you about the issue I am having:

Description of Issue

When subdag operators are used with pools, sub-task failures are not "bubbled" up to the subdag operator. The contained failed task will retry indefinitely even when retries is set to 0.

When a subtask failure occurs the subdag operator remains in the running state, the sub-task quickly enters a queued state, and shortly there after re-enters a running state.

This behavior has been observed on multiple versions. I did the testing for submitting this issue on 1.7.0rc1.

I also tested on HEAD 2016-03-26. The issue seems to get worst. The subdag which is not pooled (which works as expected on 1.7.0rc1) enters the retry state as it should .. but then never re-enters a running state. Hanging indefinitely.

jlowin commented 8 years ago

@syvineckruyk Thanks so much for taking the time to fill out such a complete report. I believed I've identified the issue, but would you mind verifying it? The fix is in my branch, so you can either clone it or you can install it directly like this:

pip install git+https://github.com/jlowin/airflow@subdag_pool_issue_1225

This branch includes the fixes from #1220, so we'd have to merge that first, but I think it's just about ready to go. I don't think #1220 directly impacts your issue, but it touches the same code so I'd like to avoid a messy conflict. Please let me know if you have any issues getting it running -- if this works for you I'll put through the PR.

If you're curious, here's what I believe is going on:

syvineckruyk commented 8 years ago

@jlowin Awesome ... I'll give it a go ... will update later today

syvineckruyk commented 8 years ago

@jlowin still getting some weird (but different) behavior ... the subdag with pool did what it should the first time but then actually succeeded which should be impossible as the task returns an exit code of 1. I need a bit more time for a better analysis.

jlowin commented 8 years ago

Strange! These subdags might be more trouble than they're worth :) @syvineckruyk instead of watching your DAG through the scheduler and airflow UI, could you just try kicking off a backfill and see if it completes or hangs (and if it properly sets the task states -- you can check those in the UI)? I want to try to minimize any other variables that could be affecting things.

airflow backfill test_18 -s 2016-03-25 -e 2016-03-25

(note with your settings this will kick off more than one run -- you might want to set your interval to 1 day for simplicity)

It's a more controllable way to see what's happening than the Scheduler and in your case should give the same result because the subdag is ultimately just running that exact code.

Sorry for the iterative debugging process -- if you post any modifications you make to your code I'll run it as well.

jlowin commented 8 years ago

If I run that backfill command (with a fresh resetdb and a new test_pool with 10 slots), I get failures as expected when I check the UI after it finishes:

screen shot 2016-03-27 at 11 12 06 pm

That's Python 2.7.11 using the branch I linked above. Next I'll run it in the scheduler, and let's see if we can figure out what's going on.

I modified your code slightly to make it run faster (literally, shortened the sleep and retry delays)

from datetime import datetime, timedelta
from airflow.models import DAG
from airflow.operators import BashOperator, SubDagOperator

dag_name = 'test_18'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 1',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 1 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=1,
        retry_delay=timedelta(seconds=3),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
jlowin commented 8 years ago

Running with the scheduler (and full airflow resetdb), I also get the expected failures (and I think nothing else runs because depends_on_past=True): screen shot 2016-03-27 at 11 24 13 pm

Please let me know if you think something else is going on -- I definitely want to get to the bottom of this.

syvineckruyk commented 8 years ago

@jlowin just got through a clean run ... (after resetdb) ... I want to perform a couple more tests/runs to make sure the behavior is consistent as I cannot explain my previous runs that had issues on your branch.

syvineckruyk commented 8 years ago

@jlowin still some issues happening. So my current testing round is based on issuing backfill from the cli .. but i suspect the behavior is the same from the scheduler.

I ran airflow backfill test_25 -s 2016-03-25 -e 2016-03-25 an it worked as expected ... got the same result as you. Then I ran airflow backfill test_25 -s 2016-03-26 -e 2016-03-26 and got a weird condition which should not be possible

image

The tasks in retry (stuck there btw)... Have retries set to 0 ... so this should not be possible. Also as you can see the subdag operators that are parent to the sub-tasks are in success ... so i think we both saw clean runs due to reset db .. but something is still off if there are previous dag runs / task instances.

Thanks for your help with all of this and let me know if you need anything from me.

jlowin commented 8 years ago

@syvineckruyk is test_25 the code I posted?

syvineckruyk commented 8 years ago

@jlowin yeah .. im bumping up the version for clean logs.

jlowin commented 8 years ago

@syvineckruyk unfortunately I can't reproduce that weird case in your screenshot. I'm putting the exact code I'm running at the bottom of this message. Thanks for your patience trying to solve this. If I had a gold medal to hand out for the "issue that identifies the most problems at once", you'd get it!

I run:

airflow resetdb -y
airflow backfill test_25 -s 2016-03-25 -e 2016-03-25
airflow backfill test_25 -s 2016-03-26 -e 2016-03-26

The 3/26 backfill simply doesn't run because its dependencies haven't been met. That's the correct behavior, since depends_on_past=True and the previous day failed.

(Something weird happens that I do need to fix but I don't think it's what you're seeing. When a task doesn't run because of dependencies, the command exits without failing. Therefore, the executor thinks it succeeded. But the task doesn't report a success, and so the executor concludes that something went wrong, puts up the "The airflow run command failed at reporting an error message" and, after three loops, fails the task. So you'll see the 3/26 task show up as failed with 0 try_number... that's wrong.)

I've updated my branch to pull the latest fixes from master -- do you mind checking it out again?

In my dags folder:

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()

dag_name = 'test_25'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': True,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=0,
        retry_delay=timedelta(seconds=0.2),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
syvineckruyk commented 8 years ago

@jlowin ill check it out now... thanks ! ... no need for apologies. let me know if I can do more ... I don't want to create a bunch of messy commits since you are already in deep. Let me know if you need anything from me other than testing.

syvineckruyk commented 8 years ago

@jlowin are you using CeleryExecutor ?

jlowin commented 8 years ago

Not for testing. I'm using Sequential Executor -- are you using Celery Executor? I'm using a completely fresh, default version of Airflow.

syvineckruyk commented 8 years ago

@jlowin So running your current branch with your current code and depends_past=false seems to work as expected. I ran two consecutive days here.

(airflow) [airflow@*** ~]$ airflow backfill test_27 -s 2016-03-25 -e 2016-03-25
[2016-03-30 01:28:35,636] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:28:35,636] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:28:35,720] {models.py:317} INFO - Loaded DAG <DAG: test_27.test_subdag_with_pool>
[2016-03-30 01:28:35,720] {models.py:317} INFO - Loaded DAG <DAG: test_27>
[2016-03-30 01:28:35,749] {base_executor.py:35} INFO - Adding to queue: airflow run test_27 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 2 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:28:41,021] {celery_executor.py:66} INFO - [celery] queuing ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:28:41,127] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:28:46,066] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:28:51,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:28:56,054] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:01,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:06,080] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:11,062] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:16,091] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:21,068] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:26,087] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:31,068] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:36,049] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:41,061] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:29:46,060] {jobs.py:806} ERROR - Task instance ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 01:29:46,060] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0))]

(airflow) [airflow@*** ~]$ airflow backfill test_27 -s 2016-03-26 -e 2016-03-26
[2016-03-30 01:32:14,677] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-03-30 01:32:15,096] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:32:15,096] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:32:15,180] {models.py:317} INFO - Loaded DAG <DAG: test_27.test_subdag_with_pool>
[2016-03-30 01:32:15,181] {models.py:317} INFO - Loaded DAG <DAG: test_27>
[2016-03-30 01:32:15,208] {base_executor.py:35} INFO - Adding to queue: airflow run test_27 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 3 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:32:20,018] {celery_executor.py:66} INFO - [celery] queuing ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:32:20,122] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:25,086] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:30,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:35,051] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:40,086] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:45,094] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:50,049] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:32:55,071] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:00,081] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:05,119] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:10,056] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:15,081] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:20,084] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:25,044] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:33:30,106] {jobs.py:806} ERROR - Task instance ('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) failed
[2016-03-30 01:33:30,107] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_27', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0))]

image

I'll continue testing different configurations.

syvineckruyk commented 8 years ago

Set retries=2 and retry_delay=10

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()

dag_name = 'test_28'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
(airflow) [airflow@*** ~]$ airflow backfill test_28 -s 2016-03-25 -e 2016-03-25
[2016-03-30 01:42:52,081] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-03-30 01:42:52,504] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:42:52,504] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:42:52,568] {models.py:317} INFO - Loaded DAG <DAG: test_28.test_subdag_with_pool>
[2016-03-30 01:42:52,568] {models.py:317} INFO - Loaded DAG <DAG: test_28>
[2016-03-30 01:42:52,596] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 4 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:42:58,018] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:42:58,130] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:03,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:08,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:13,056] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:18,079] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:23,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:28,091] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:33,058] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:38,205] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:43,063] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:48,099] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:53,196] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:43:58,099] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:03,093] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:08,062] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:13,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:18,023] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:18,028] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 4 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:44:23,015] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:44:23,059] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:28,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:33,146] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:38,159] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:43,074] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:48,059] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:53,083] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:44:58,145] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:03,037] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:08,120] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:13,142] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:18,013] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:18,018] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-25T00:00:00 --pickle 4 --local -s 2016-03-25T00:00:00 
[2016-03-30 01:45:23,021] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 01:45:23,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:28,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:33,107] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:38,036] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:43,105] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:48,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:53,084] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:45:58,176] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:46:03,155] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:46:08,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:46:13,079] {jobs.py:806} ERROR - Task instance ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 01:46:13,079] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 25, 0, 0))]

(airflow) [airflow@*** ~]$ airflow backfill test_28 -s 2016-03-26 -e 2016-03-26
[2016-03-30 01:47:35,730] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-03-30 01:47:36,161] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags
[2016-03-30 01:47:36,161] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 01:47:36,249] {models.py:317} INFO - Loaded DAG <DAG: test_28.test_subdag_with_pool>
[2016-03-30 01:47:36,249] {models.py:317} INFO - Loaded DAG <DAG: test_28>
[2016-03-30 01:47:36,276] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 5 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:47:41,020] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:47:41,121] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:47:46,050] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:47:51,158] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:47:56,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:01,175] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:06,138] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:11,055] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:16,097] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:21,053] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:26,175] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:31,082] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:36,104] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:41,168] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:46,047] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:51,053] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:56,020] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:48:56,025] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 5 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:49:01,023] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:49:01,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:06,100] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:11,094] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:16,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:21,066] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:26,113] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:31,071] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:36,065] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:41,064] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:46,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:46,030] {base_executor.py:35} INFO - Adding to queue: airflow run test_28 test_subdag_with_pool 2016-03-26T00:00:00 --pickle 5 --local -s 2016-03-26T00:00:00 
[2016-03-30 01:49:51,022] {celery_executor.py:66} INFO - [celery] queuing ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) through celery, queue=dc
[2016-03-30 01:49:51,169] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:49:56,040] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:01,064] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:06,077] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:11,094] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:16,044] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:21,042] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:26,037] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 01:50:31,080] {jobs.py:806} ERROR - Task instance ('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0)) failed
[2016-03-30 01:50:31,081] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 0 | kicked_off: 1 | failed: 1 | wont_run: 0 
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/bin/airflow", line 4, in <module>
    __import__('pkg_resources').run_script('airflow==1.6.2', 'airflow')
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 726, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1484, in run_script
    exec(code, namespace, namespace)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/EGG-INFO/scripts/airflow", line 15, in <module>
    args.func(args)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/bin/cli.py", line 87, in backfill
    pool=args.pool)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
airflow.exceptions.AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_28', 'test_subdag_with_pool', datetime.datetime(2016, 3, 26, 0, 0))]

image

This also seems to work as expected ... will drill through the logs though to make sure.

Should I be able to test with depends_on_past = true ? Or is there still an open issue ?

Thanks !

syvineckruyk commented 8 years ago

@jlowin Same DAG (with a version bump) enters a weird loop when run by scheduler.

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()

dag_name = 'test_30'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

The loop is not infinite ... but the sub-task queues, fails, and re-runs many times before the error bubbles up to the subdag operator.

subdag operator log: node 1:

[2016-03-30 02:20:05,379] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:05,379] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:05,456] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:05,456] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:06,627] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:06,627] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:06,662] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:06,662] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:06,675] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 3
--------------------------------------------------------------------------------

[2016-03-30 02:20:06,689] {models.py:1060} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-25 00:00:00
[2016-03-30 02:20:06,733] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:12,012] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:12,120] {jobs.py:863} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:17,081] {jobs.py:863} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:22,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:22,094] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:27,017] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:27,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:32,060] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:37,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:37,085] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:42,016] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:42,189] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:47,055] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:52,066] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:20:52,082] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:20:57,021] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:20:57,032] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:02,076] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:07,071] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:07,082] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:12,015] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:12,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:17,050] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:22,043] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:22,055] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:27,013] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:27,023] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:32,043] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:37,057] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:37,070] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:42,013] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:42,043] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:47,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:52,053] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:21:52,067] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:21:57,018] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:21:57,025] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:02,062] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:07,059] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:07,071] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:22:12,012] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:22:12,024] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:17,054] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:22,050] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:22,063] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:22:27,026] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:22:27,034] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:32,063] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-03-30 02:22:37,054] {jobs.py:806} ERROR - Task instance ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 02:22:37,054] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 2 | failed: 1 | wont_run: 0 
[2016-03-30 02:22:37,055] {models.py:1125} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:22:37,056] {models.py:1137} INFO - Marking task as UP_FOR_RETRY
[2016-03-30 02:22:37,075] {models.py:1166} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:22:52,255] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:52,256] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:53,514] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:53,514] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:53,551] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:53,551] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:53,564] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 2 of 3
--------------------------------------------------------------------------------

[2016-03-30 02:22:53,582] {models.py:1060} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-25 00:00:00
[2016-03-30 02:22:53,629] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:22:59,017] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:22:59,118] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:04,083] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:09,072] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:09,083] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:14,011] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:14,021] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:19,064] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:24,079] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:24,092] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:29,017] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:29,030] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:34,063] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:39,093] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:39,106] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:44,018] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:44,029] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:49,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:54,069] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:23:54,079] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:23:59,015] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:23:59,027] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:04,042] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:09,039] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:09,051] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:24:14,014] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:24:14,024] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:19,040] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:24,042] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:24,055] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:24:29,012] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:24:29,023] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:34,067] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:24:39,061] {jobs.py:806} ERROR - Task instance ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 02:24:39,061] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-03-30 02:24:39,062] {models.py:1125} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:24:39,063] {models.py:1137} INFO - Marking task as UP_FOR_RETRY
[2016-03-30 02:24:39,077] {models.py:1166} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]

subdag operator log: node 2:

[2016-03-30 02:24:55,354] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:55,354] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:55,411] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:55,412] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:56,395] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:56,395] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:56,431] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:56,431] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:56,445] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 3 of 3
--------------------------------------------------------------------------------

[2016-03-30 02:24:56,459] {models.py:1060} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-25 00:00:00
[2016-03-30 02:24:56,499] {base_executor.py:35} INFO - Adding to queue: airflow run test_30.test_subdag_with_pool test_task_2_with_exit_1 2016-03-25T00:00:00 --local --pool test_pool -s 2016-03-25T00:00:00 -sd DAGS_FOLDER/steven_test/main.py 
[2016-03-30 02:25:01,022] {celery_executor.py:66} INFO - [celery] queuing ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) through celery, queue=dc
[2016-03-30 02:25:01,113] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:25:06,047] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:25:11,056] {jobs.py:863} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-03-30 02:25:16,049] {jobs.py:806} ERROR - Task instance ('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0)) failed
[2016-03-30 02:25:16,049] {jobs.py:863} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-03-30 02:25:16,049] {models.py:1125} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2630, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 872, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
[2016-03-30 02:25:16,050] {models.py:1143} INFO - All retries failed; marking task as FAILED
[2016-03-30 02:25:16,066] {models.py:1166} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_30.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 25, 0, 0))]
(airflow) [airflow@usw2dcdpag01 ~]$ 

sub-task log: from ui:

[2016-03-30 02:20:36,783] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:36,783] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:36,819] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:36,819] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:37,974] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:37,974] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:38,012] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:38,012] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:38,027] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:20:38,041] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:20:38,059] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:20:38,061] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmp00DHHF//tmp/airflowtmp00DHHF/test_task_2_with_exit_1WRaiEi
[2016-03-30 02:20:38,061] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:20:38,065] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:20:38,066] {bash_operator.py:77} INFO - hello
[2016-03-30 02:20:38,267] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:20:38,268] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:20:38,269] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:20:38,286] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:20:52,315] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:52,315] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:52,351] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:53,528] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:53,528] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:20:53,564] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:20:53,564] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:20:53,581] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:20:53,596] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:20:53,613] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:20:53,614] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpxFKcuu//tmp/airflowtmpxFKcuu/test_task_2_with_exit_1Z9sXI8
[2016-03-30 02:20:53,615] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:20:53,619] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:20:53,620] {bash_operator.py:77} INFO - hello
[2016-03-30 02:20:53,821] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:20:53,821] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:20:53,823] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:20:53,837] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:21:13,209] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:13,210] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:13,265] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:13,265] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:14,413] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:14,414] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:14,452] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:14,452] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:14,468] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:21:14,478] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:21:28,904] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:28,904] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:28,977] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:28,977] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:30,144] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:30,145] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:30,182] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:30,182] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:30,198] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:21:30,213] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:21:44,361] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:44,362] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:44,432] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:44,432] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:45,615] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:45,615] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:21:45,658] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:21:45,658] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:21:45,677] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:21:45,690] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:22:21,050] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:21,050] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:21,086] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:21,087] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:22,268] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:22,268] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:22:22,303] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:22:22,303] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:22:22,318] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:22:22,338] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:22:22,357] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:22:22,359] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpRDlYlN//tmp/airflowtmpRDlYlN/test_task_2_with_exit_1RCl2T3
[2016-03-30 02:22:22,359] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:22:22,363] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:22:22,364] {bash_operator.py:77} INFO - hello
[2016-03-30 02:22:22,565] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:22:22,566] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:22:22,567] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:22:22,582] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:23:07,922] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:07,922] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:07,960] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:07,960] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:09,128] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:09,128] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:09,168] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:09,168] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:09,184] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:23:09,198] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:23:09,215] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:23:09,216] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmp80YCR0//tmp/airflowtmp80YCR0/test_task_2_with_exit_1PCUde2
[2016-03-30 02:23:09,216] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:23:09,221] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:23:09,222] {bash_operator.py:77} INFO - hello
[2016-03-30 02:23:09,423] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:23:09,423] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:23:09,425] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:23:09,437] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:23:23,540] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:23,540] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:23,578] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:23,578] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:24,749] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:24,749] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:23:24,787] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:23:24,788] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:23:24,804] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:23:24,818] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:23:24,835] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:23:24,836] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpGTyqlC//tmp/airflowtmpGTyqlC/test_task_2_with_exit_14j9Ctl
[2016-03-30 02:23:24,836] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:23:24,841] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:23:24,842] {bash_operator.py:77} INFO - hello
[2016-03-30 02:23:25,043] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:23:25,043] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:23:25,045] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:23:25,062] {models.py:1166} ERROR - Bash command failed
[2016-03-30 02:24:00,177] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:00,177] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:00,242] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:00,242] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:01,406] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:01,406] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:01,446] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:01,446] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:01,461] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:24:01,471] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:24:15,665] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:15,665] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:15,737] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:15,737] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:16,911] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:16,911] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:16,953] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:16,953] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:16,968] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:24:16,978] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:24:31,183] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:31,184] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:31,240] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:31,240] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:32,404] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:32,404] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:24:32,441] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:24:32,441] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:24:32,457] {models.py:1028} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:24:32,476] {models.py:1033} INFO - Queuing into pool test_pool
[2016-03-30 02:25:12,436] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:12,436] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:12,502] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:25:12,502] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:25:13,684] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:13,684] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-03-30 02:25:13,720] {models.py:317} INFO - Loaded DAG <DAG: test_30.test_subdag_with_pool>
[2016-03-30 02:25:13,720] {models.py:317} INFO - Loaded DAG <DAG: test_30>
[2016-03-30 02:25:13,735] {models.py:1037} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-03-30 02:25:13,749] {models.py:1060} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-25 00:00:00
[2016-03-30 02:25:13,767] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-03-30 02:25:13,768] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmp_3kW3N//tmp/airflowtmp_3kW3N/test_task_2_with_exit_13LwrXU
[2016-03-30 02:25:13,768] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-03-30 02:25:13,773] {bash_operator.py:73} INFO - Output:
[2016-03-30 02:25:13,774] {bash_operator.py:77} INFO - hello
[2016-03-30 02:25:13,975] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-03-30 02:25:13,975] {models.py:1125} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1086, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-03-30 02:25:13,977] {models.py:1145} INFO - Marking task as FAILED.
[2016-03-30 02:25:13,991] {models.py:1166} ERROR - Bash command failed
syvineckruyk commented 8 years ago

@jlowin I noticed I had not set the depends_on_past=False in the subdag operator specification. Performed another test with this set and observed the same behavior as above. (multiple attempts before bubbling up to the subdag operator)

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()

dag_name = 'test_31'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 25)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=False,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)
jlowin commented 8 years ago

I've managed to replicate this behavior by dropping my scheduler_heartbeat_sec to 1 in airflow.cfg. I haven't found exactly where yet, but it looks like if you flood the CeleryExecutor with requests to run tasks, there's some critical moment where its guard is down and it schedules tasks that it's already run. That's my working hypothesis, anyway. What is your scheduler heartbeat interval set to?

syvineckruyk commented 8 years ago

@jlowin job_heartbeat_sec = 5 scheduler_heartbeat_sec = 5

intersting observation ! ... I'll run some tests with upped hearbeat times.

jlowin commented 8 years ago

@syvineckruyk please give my latest commits a try.

Here's what I found (I know I keep writing that but the bittersweet news is you're hitting lots of pain points!): the SchedulerJob has a prioritize_queued() method that looks for queued tasks and tries to run them. Critically, it includes any queued task, no matter how it was run. The problem is that the subdag is being run by a BackfillJob with its OWN loop checking for queued tasks. So:

  1. the BackfillJob running a CeleryExecutor puts the job in "queued" status
  2. the 'BackfillJob' running a CeleryExecutor submits the task to the executor
  3. near-simultaneously, the SchedulerJob running a different CeleryExecutor (because it's on a different machine or process) submits the task to its own executor
  4. Whichever task is run first flips the task from queued -> running. Normally, further run requests would be ignored, but the second request comes in and is followed, flipping the state back to queued.
  5. Repeat.

Solution: prioritize_queued only considers tasks in the Scheduler's DagBag and ignores subdags on the assumption they are being run on their own.

This isn't a complete solution -- the problem will crop up if someone is running a scheduler along any other kind of job because the scheduler will try to run all queued tasks (in fact I wonder if it's the cause of some of the weird backfill errors people have been seeing). To do this properly, I will make the scheduler only work with its "own" tasks -- but that will involve more work. Please let me know if this solution works for now.

p.s. if you want to really stress this scenario, set your scheduler_heartbeat to 0

Thanks!

syvineckruyk commented 8 years ago

@jlowin this seems to be working much better. As far as actual execution, errors, and bubbling up of errors everything seems in order ... I am seeing more queuing activity then I would expect as I spin up workers ... Need to get the logs together ... Im at Strata conference all day today so more then likely will not get to this before this evening.

Thanks again for all your help.

Also just wanted to clarify your last statement.

the problem will crop up if someone is running a scheduler along any other kind of job because the scheduler will try to run all queued tasks

Do you mean for example if someone were to run a backfill from cli while the schedule is running ? Are the subdag operator's "backfills" protected from this ?

Thanks

jlowin commented 8 years ago

Yes, that's exactly what I mean -- the Scheduler is subtly greedy and tries to run ALL queued tasks even if they were queued by a subdagoperator's backfill (or potentially a CLI backfill). I will fix that with the PR that addresses all of your issues.

syvineckruyk commented 8 years ago

I will fix that with the PR that addresses all of your issues.

You're making me sound like a basket case ;)

jlowin commented 8 years ago

I should have said: all of Airflow's issues!

syvineckruyk commented 8 years ago

@jlowin So here are my observations based on your latest commits

In the subdag operator logs it appears that every task gets queued twice. Is it normal ?

The task instance logs looks fine.

Going to try with depends on past next.

Code

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()

dag_name = 'test_35'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 28)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=False,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

Subdag operator log from worker 2 (no log on worker 1)

[root@***02 ~]# cat /home/airflow/airflow/logs/test_35/test_subdag_with_pool/2016-03-28T00\:00\:00 
[2016-04-01 18:21:00,748] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:00,748] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:00,811] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:00,811] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:01,968] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:01,968] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:02,006] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:02,006] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:02,024] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:21:02,044] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:21:02,089] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:07,016] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:07,128] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:12,046] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:17,042] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:17,059] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:22,015] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:22,023] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:27,101] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:32,077] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:32,107] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:37,086] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:37,106] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:42,109] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:47,068] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:47,086] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:52,016] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:21:52,026] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:21:57,055] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:02,074] {jobs.py:883} ERROR - Task instance ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:22:02,074] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 2 | failed: 1 | wont_run: 0 
[2016-04-01 18:22:02,075] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:22:02,076] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:22:02,098] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:22:18,834] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:18,834] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:18,877] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:18,877] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:20,021] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:20,021] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:20,071] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:20,071] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:20,087] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 2 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:22:20,105] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:22:20,143] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:22:25,014] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:22:25,118] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:30,061] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:35,046] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:35,065] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:22:40,021] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:22:40,033] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:45,061] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:50,062] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:22:55,066] {jobs.py:883} ERROR - Task instance ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:22:55,072] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:22:55,072] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:22:55,073] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:22:55,087] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:23:10,900] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:10,900] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:10,964] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:10,964] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:12,115] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:12,116] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:12,157] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:12,157] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:12,174] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 3 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:23:12,189] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:23:12,229] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:23:17,013] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:23:17,162] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:22,057] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:27,049] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:27,067] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:23:32,019] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:23:32,027] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:37,052] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:42,067] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:23:47,079] {jobs.py:883} ERROR - Task instance ('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:23:47,079] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:23:47,079] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:23:47,081] {models.py:1193} INFO - All retries failed; marking task as FAILED
[2016-04-01 18:23:47,101] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_35.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]

Subtask log from worker 1

[root@***01 ~]# cat /home/airflow/airflow/logs/test_35.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00 
[2016-04-01 18:21:38,306] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:38,307] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:38,339] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:38,339] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:39,339] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:39,339] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:39,372] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:39,372] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:39,387] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:21:39,397] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:21:54,182] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:54,182] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:54,215] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:54,215] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:55,211] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:55,212] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:21:55,247] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:21:55,247] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:21:55,261] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:21:55,274] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:21:55,293] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:21:55,295] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmphCzdRV//tmp/airflowtmphCzdRV/test_task_2_with_exit_1oD_rdc
[2016-04-01 18:21:55,295] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:21:55,298] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:21:55,299] {bash_operator.py:77} INFO - hello
[2016-04-01 18:21:55,501] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:21:55,502] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:21:55,503] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:21:55,516] {models.py:1216} ERROR - Bash command failed

Subtask log from worker 2

[root@usw2dcdpag02 ~]# cat /home/airflow/airflow/logs/test_35.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00 
[2016-04-01 18:22:29,202] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:29,202] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:29,264] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:29,264] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:30,440] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:30,440] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:30,477] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:30,477] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:30,496] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:22:30,507] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:22:44,889] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:44,889] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:44,954] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:44,954] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:46,122] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:46,122] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:22:46,162] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:22:46,162] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:22:46,178] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:22:46,193] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:22:46,210] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:22:46,211] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpAafXtN//tmp/airflowtmpAafXtN/test_task_2_with_exit_1QMBYdK
[2016-04-01 18:22:46,211] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:22:46,215] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:22:46,216] {bash_operator.py:77} INFO - hello
[2016-04-01 18:22:46,417] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:22:46,417] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:22:46,419] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:22:46,434] {models.py:1216} ERROR - Bash command failed
[2016-04-01 18:23:21,401] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:21,401] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:21,491] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:21,491] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:22,643] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:22,643] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:22,686] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:22,687] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:22,705] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:23:22,716] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:23:37,244] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:37,244] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:37,280] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:37,280] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:38,435] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:38,436] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:23:38,474] {models.py:317} INFO - Loaded DAG <DAG: test_35.test_subdag_with_pool>
[2016-04-01 18:23:38,474] {models.py:317} INFO - Loaded DAG <DAG: test_35>
[2016-04-01 18:23:38,490] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:23:38,505] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:23:38,524] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:23:38,525] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpnU0bjL//tmp/airflowtmpnU0bjL/test_task_2_with_exit_1kAx8gd
[2016-04-01 18:23:38,525] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:23:38,529] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:23:38,530] {bash_operator.py:77} INFO - hello
[2016-04-01 18:23:38,732] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:23:38,732] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:23:38,733] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:23:38,748] {models.py:1216} ERROR - Bash command failed
jlowin commented 8 years ago

@syvineckruyk you're referring to the doubles near the top of the log, right? (After "starting attempt 1 of 3"). If so, that's normal. The line appears any time the task is sent to the executor. The first time, it gets queued into the pool and the second time it gets run. That's why you don't see it for the non-pooled tasks; they go straight to running.

Maybe we can make that log a little clearer about executor queuing vs celery queuing vs pool queuing.

syvineckruyk commented 8 years ago

@jlowin I think I understand ... just to be 100% sure. I am talking about these messages:

[2016-04-01 18:21:02,089] {base_executor.py:35} INFO - Adding to queue: airflow run test_35.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:21:07,016] {celery_executor.py:67} INFO - [celery] queuing ('test_35.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc

Which are also duplicated for "starting attempt 2 of 3" and "starting attempt 3 of 3"... Sounds like we are talking about the same thing though.

syvineckruyk commented 8 years ago

@jlowin

Code run

from datetime import datetime, timedelta
from airflow.models import DAG, Pool
from airflow.operators import BashOperator, SubDagOperator

import airflow
session = airflow.settings.Session()

pool = (
    session.query(Pool)
    .filter(Pool.pool == 'test_pool')
    .first())
if not pool:
    session.add(Pool(pool='test_pool', slots=10))
    session.commit()

dag_name = 'test_36'
test_pool = 'test_pool'

start_time = datetime(2016, 3, 28)

default_args = {
    'owner': 'Test',
    'depends_on_past': False,
    'start_date': start_time,
    # 'email': [email_to],
    # 'email_on_failure': True,
    # 'email_on_retry': True,
    # 'wait_for_downstream': False,
}

# Create the dag object
dag = DAG(dag_name,
          default_args=default_args,
          schedule_interval=timedelta(days=1))

def get_subdag(dag, sd_id, pool=None):
    subdag = DAG(
        dag_id='{parent_dag}.{sd_id}'.format(
            parent_dag=dag.dag_id,
            sd_id=sd_id),
        params=dag.params,
        default_args=dag.default_args,
        template_searchpath=dag.template_searchpath,
        user_defined_macros=dag.user_defined_macros,
    )

    t1 = BashOperator(
        task_id='test_task',
        bash_command='echo "hello" && sleep 0.2',
        dag=subdag,
        pool=pool
    )

    t2 = BashOperator(
        task_id='test_task_2_with_exit_1',
        bash_command='echo "hello" && sleep 0.2 && exit 1',
        dag=subdag,
        pool=pool
    )

    t2.set_upstream(t1)

    sdo = SubDagOperator(
        task_id=sd_id,
        subdag=subdag,
        retries=2,
        retry_delay=timedelta(seconds=10),
        dag=dag,
        depends_on_past=True,
    )

    return sdo

# sd1 = get_subdag(dag, sd_id='test_subdag')
sd2 = get_subdag(dag, sd_id='test_subdag_with_pool', pool=test_pool)

Subdag Operator - Worker 1 - Log


[root@***01 ~]# cat /home/airflow/airflow/logs/test_36/test_subdag_with_pool/2016-03-28T00\:00\:00 
[2016-04-01 18:47:53,011] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:53,011] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:53,062] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:47:53,062] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:47:54,082] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:54,082] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:47:54,117] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:47:54,117] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:47:54,132] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:47:54,147] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:47:54,186] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:47:59,020] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:47:59,116] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:04,088] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:09,087] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:14,066] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:14,087] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:48:19,013] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:48:19,021] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:24,060] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:29,065] {jobs.py:942} INFO - [backfill progress] waiting: 2 | succeeded: 0 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:29,075] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:48:34,062] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:48:34,070] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:39,053] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:44,063] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:49,069] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:49,079] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:48:54,015] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:48:54,158] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:48:59,036] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 2 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:04,054] {jobs.py:883} ERROR - Task instance ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:49:04,055] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 2 | failed: 1 | wont_run: 0 
[2016-04-01 18:49:04,055] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:49:04,056] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:49:04,069] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]

Subdag Operator - Worker 2 - Log

[root@***02 ~]# cat /home/airflow/airflow/logs/test_36/test_subdag_with_pool/2016-03-28T00\:00\:00 
[2016-04-01 18:49:21,265] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:21,265] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:21,366] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:21,366] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:22,509] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:22,509] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:22,546] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:22,546] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:22,560] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 2 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:49:22,575] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:49:22,613] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:49:28,013] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:49:28,106] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:33,048] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:38,050] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:38,064] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:49:43,012] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:49:43,019] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:48,045] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:49:53,042] {jobs.py:883} ERROR - Task instance ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:49:53,042] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:49:53,042] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:49:53,043] {models.py:1187} INFO - Marking task as UP_FOR_RETRY
[2016-04-01 18:49:53,059] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:50:08,057] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:08,057] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:08,134] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:08,134] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:09,274] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:09,274] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:09,314] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:09,314] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:09,332] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 3 of 3
--------------------------------------------------------------------------------

[2016-04-01 18:50:09,347] {models.py:1110} INFO - Executing <Task(SubDagOperator): test_subdag_with_pool> on 2016-03-28 00:00:00
[2016-04-01 18:50:09,382] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:50:14,018] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:50:14,142] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:19,053] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:24,079] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:29,045] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:29,057] {base_executor.py:35} INFO - Adding to queue: airflow run test_36.test_subdag_with_pool test_task_2_with_exit_1 2016-03-28T00:00:00 --local --pool test_pool -sd DAGS_FOLDER/steven_test/main.py 
[2016-04-01 18:50:34,013] {celery_executor.py:67} INFO - [celery] queuing ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) through celery, queue=dc
[2016-04-01 18:50:34,025] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:39,080] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:44,062] {jobs.py:942} INFO - [backfill progress] waiting: 1 | succeeded: 1 | kicked_off: 1 | failed: 0 | wont_run: 0 
[2016-04-01 18:50:49,073] {jobs.py:883} ERROR - Task instance ('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0)) failed
[2016-04-01 18:50:49,073] {jobs.py:942} INFO - [backfill progress] waiting: 0 | succeeded: 1 | kicked_off: 1 | failed: 1 | wont_run: 0 
[2016-04-01 18:50:49,078] {models.py:1175} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/subdag_operator.py", line 45, in execute
    executor=self.executor)
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 2729, in run
    job.run()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 175, in run
    self._execute()
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/jobs.py", line 951, in _execute
    raise AirflowException(msg)
AirflowException: ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]
[2016-04-01 18:50:49,080] {models.py:1193} INFO - All retries failed; marking task as FAILED
[2016-04-01 18:50:49,092] {models.py:1216} ERROR - ------------------------------------------
Some tasks instances failed, here's the list:
[('test_36.test_subdag_with_pool', 'test_task_2_with_exit_1', datetime.datetime(2016, 3, 28, 0, 0))]

Sub-Task - Worker 1 - Log

[root@***01 ~]# cat /home/airflow/airflow/logs/test_36.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00 
[2016-04-01 18:48:55,206] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:55,207] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:55,297] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:55,297] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:56,281] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:56,282] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:56,314] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:56,314] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:56,329] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:48:56,344] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:48:56,359] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:48:56,361] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpHOJK_m//tmp/airflowtmpHOJK_m/test_task_2_with_exit_18ZJESg
[2016-04-01 18:48:56,361] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:48:56,365] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:48:56,366] {bash_operator.py:77} INFO - hello
[2016-04-01 18:48:56,567] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:48:56,567] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:48:56,569] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:48:56,584] {models.py:1216} ERROR - Bash command failed

Sub-task - Worker 2 - Log

[root@***02 ~]# cat /home/airflow/airflow/logs/test_36.test_subdag_with_pool/test_task_2_with_exit_1/2016-03-28T00\:00\:00
[2016-04-01 18:48:39,716] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:39,717] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:39,772] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:39,772] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:40,917] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:40,917] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:48:40,955] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:48:40,955] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:48:40,971] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:48:40,982] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:49:31,616] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:31,616] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:31,673] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:31,673] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:32,830] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:32,831] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:32,908] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:32,908] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:32,923] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:49:32,932] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:49:47,250] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:47,250] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:47,321] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:47,321] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:48,476] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:48,476] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:49:48,515] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:49:48,515] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:49:48,531] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:49:48,545] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:49:48,562] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:49:48,563] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpk5MaUu//tmp/airflowtmpk5MaUu/test_task_2_with_exit_1hmLgze
[2016-04-01 18:49:48,563] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:49:48,567] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:49:48,568] {bash_operator.py:77} INFO - hello
[2016-04-01 18:49:48,769] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:49:48,769] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:49:48,771] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:49:48,786] {models.py:1216} ERROR - Bash command failed
[2016-04-01 18:50:18,447] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:18,448] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:18,536] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:18,536] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:19,678] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:19,678] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:19,715] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:19,715] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:19,731] {models.py:1078} INFO - 
--------------------------------------------------------------------------------
Queuing attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:50:19,741] {models.py:1083} INFO - Queuing into pool test_pool
[2016-04-01 18:50:39,222] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:39,223] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:39,260] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:39,260] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:40,413] {models.py:147} INFO - Filling up the DagBag from /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:40,413] {models.py:225} INFO - Importing /home/airflow/airflow/dags/steven_test/main.py
[2016-04-01 18:50:40,449] {models.py:317} INFO - Loaded DAG <DAG: test_36.test_subdag_with_pool>
[2016-04-01 18:50:40,449] {models.py:317} INFO - Loaded DAG <DAG: test_36>
[2016-04-01 18:50:40,465] {models.py:1087} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------

[2016-04-01 18:50:40,485] {models.py:1110} INFO - Executing <Task(BashOperator): test_task_2_with_exit_1> on 2016-03-28 00:00:00
[2016-04-01 18:50:40,502] {bash_operator.py:55} INFO - tmp dir root location: 
/tmp
[2016-04-01 18:50:40,504] {bash_operator.py:64} INFO - Temporary script location :/tmp/airflowtmpmrPZZ6//tmp/airflowtmpmrPZZ6/test_task_2_with_exit_1ApB6qp
[2016-04-01 18:50:40,504] {bash_operator.py:65} INFO - Running command: echo "hello" && sleep 0.2 && exit 1
[2016-04-01 18:50:40,508] {bash_operator.py:73} INFO - Output:
[2016-04-01 18:50:40,509] {bash_operator.py:77} INFO - hello
[2016-04-01 18:50:40,710] {bash_operator.py:80} INFO - Command exited with return code 1
[2016-04-01 18:50:40,710] {models.py:1175} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/airflow/.pyenv/versions/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/models.py", line 1136, in run
    result = task_copy.execute(context=context)
  File "/home/airflow/.pyenv/versions/2.7.10/envs/airflow/lib/python2.7/site-packages/airflow-1.6.2-py2.7.egg/airflow/operators/bash_operator.py", line 83, in execute
    raise AirflowException("Bash command failed")
AirflowException: Bash command failed
[2016-04-01 18:50:40,712] {models.py:1195} INFO - Marking task as FAILED.
[2016-04-01 18:50:40,727] {models.py:1216} ERROR - Bash command failed
jlowin commented 8 years ago

@syvineckruyk #1271 was just merged into master so I'm going to mark this issue as closed. If you have any other problems please let me know!

syvineckruyk commented 8 years ago

thanks @jlowin great work on this fix.