discoproject / disco

a Map/Reduce framework for distributed computing
http://discoproject.org
BSD 3-Clause "New" or "Revised" License
1.63k stars 241 forks source link

disco.error.DiscoError: Failed to submit jobpack: Job failed to start: invalid_job_inputs #642

Closed ghost closed 7 years ago

ghost commented 7 years ago

This does not work:

from __future__ import print_function

import sys, os, tarfile, argparse, codecs
from itertools import islice
from backports.lzma import LZMAFile
from six import binary_type
from subprocess import Popen, PIPE

from disco.core import Job, result_iterator, Disco
from disco.worker.pipeline.worker import Stage
from disco.worker import Params
from disco.worker.task_io import chain_reader, task_input_stream

def init(interface, params):
    ''' passes params to each map '''
    yield params

def iter_pgs_item(interface, params, some_number, inputs):
    raise Exception('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAFAFFA')

def reduce_key_count(interface, state, label, inp):
    out = interface.output(0)
    out.add('okay this no worky', 1)

class KeyCount(Job):    
    from virtualenvworker.pipeline import Worker
    pipeline = [("split", Stage("iter_pgs_item", process=iter_pgs_item, init=init))
    # ("group_label", Stage("reduce", process=reduce_key_count))
    ]

if __name__ == '__main__':
    from disco.worker import Params
    batch = sys.argv[1:]
    batch = ' https://os-r-object.vip.lvs.com/v1/KEY_1398efc9acef444f899a1018ddc9797e/payloads/'.join([''] +batch).split()
    params = Params(some='parameter here',
                    some_other='other param here')
    from wtf import KeyCount
    job = KeyCount()
    # [(str(i), str(i), b) for i, b in enumerate(batch)]
    job.run(input=batch, params=params)
    for items in result_iterator(job.wait(show=True)):
        print(items)

This sort of works:

from __future__ import print_function

import sys, os, tarfile, argparse, codecs
from itertools import islice
from backports.lzma import LZMAFile
from six import binary_type
from subprocess import Popen, PIPE

from disco.core import Job, result_iterator, Disco
from disco.worker.pipeline.worker import Stage
from disco.worker import Params
from disco.worker.task_io import chain_reader, task_input_stream

def init(interface, params):
    ''' passes params to each map '''
    yield params

def iter_pgs_item(interface, params, some_number, inputs):
    raise Exception('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAFAFFA')

def reduce_key_count(interface, state, label, inp):
    out = interface.output(0)
    out.add('okay this no worky', 1)

class KeyCount(Job):    
    from virtualenvworker.pipeline import Worker
    pipeline = [("split", Stage("iter_pgs_item", process=iter_pgs_item, init=init))
    # ("group_label", Stage("reduce", process=reduce_key_count))
    ]

if __name__ == '__main__':
    from disco.worker import Params
    batch = sys.argv[1:]
    batch = ' https://os-r-object.vip.lvs.com/v1/KEY_1398efc9acef444f899a1018ddc9797e/payloads/'.join([''] +batch).split()
    params = Params(some='parameter here',
                    some_other='other param here')
    from wtf import KeyCount
    job = KeyCount()

    job.run(input=[(str(i), str(i), b) for i, b in enumerate(batch)], params=params)
    for items in result_iterator(job.wait(show=True)):
        print(items)

Just this for a very very long time:

2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks
2016/12/16 17:04:18 master     New job initialized!
2016/12/16 17:04:18 master     Created job "KeyCount@5c9:e3cc2:3f7b1"
2016/12/16 17:04:18 master     Stage iter_pgs_item scheduled with 1 tasks

(venv_xk) jenkins@jenkins-4139:~/workspace/count_keys$ echo $LANG en_US.UTF-8 (venv_xk) jenkins@jenkins-4139:~/workspace/count_keys$ python --version Python 2.7.6 jmunsch@disco-master-5147:~$ erl version Erlang R16B03 (erts-5.10.4) [source] [64-bit] [async-threads:10] [kernel-poll:false]

ghost commented 7 years ago

I was able to resolve this issue by updating the input tuples.

To: (<index>,<index>, <input>)

rebooting all of the disco nodes

restarting disco-master: service disco-master restart

disco UI > configure > Misc. > max failures = 1

and purging the disco master jobs:

disco jobs | xargs disco kill
disco jobs | xargs disco purge