discoproject / disco

a Map/Reduce framework for distributed computing
http://discoproject.org
BSD 3-Clause "New" or "Revised" License
1.63k stars 241 forks source link

Disco pickling _conpile from re.py when using virtualenv #609

Open trein opened 9 years ago

trein commented 9 years ago

Hi,

I'm currently developing some tools with Disco (v0.4.5) to automate a few repetitive tasks and I came across a problem while using virtualenv. I created a very simple MR job that reproduces the problem I'm currently experiencing:

from disco.core import Job
from disco.util import kvgroup
import boto

AWS_ACCESS_KEY = 'AWS_ACCESS_KEY'
AWS_SECRET_ACCESS_KEY = 'AWS_SECRET_ACCESS_KEY'

class S3(object):
    def __init__(self):
        self.conn = boto.connect_s3(aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)

class WordCount(Job):
    def __init__(self):
        self.s3 = S3()
        super(WordCount, self).__init__(name='wordcount')

    @staticmethod
    def map(line, params):
        for word in line.split():
            yield word, 1

    @staticmethod
    def reduce(iter, params):
        for word, counts in kvgroup(sorted(iter)):
            yield word, sum(counts)

if __name__ == '__main__':
    from wordcount import WordCount

    job = WordCount().run(input=["http://discoproject.org/media/text/chekhov.txt"])
    for word, count in job.disco.result_iterator(job.wait(show=True)):
        print(word, count)

The error that I get just after dispatching the job is:

2015/01/21 19:00:25 master     map:0 assigned to revert-slave-3
2015/01/21 19:00:26 master     ERROR: Job failed: Worker at 'revert-slave-3' died: Traceback (most recent call last):
  File "/mnt/ebs/disco/data/host-3/37/wordcount@58d:d3a09:d5dfa/Library/Python/2.7/site-packages/disco/worker/__init__.py", line 335, in main
    task = cls.get_task()
  File "/mnt/ebs/disco/data/host-3/37/wordcount@58d:d3a09:d5dfa/Library/Python/2.7/site-packages/disco/worker/__init__.py", line 385, in get_task
    return Task(**dict((str(k), v) for k, v in cls.send('TASK').items()))
  File "/mnt/ebs/disco/data/host-3/37/wordcount@58d:d3a09:d5dfa/Library/Python/2.7/site-packages/disco/task.py", line 67, in __init__
    self.jobobjs = dPickle.loads(self.jobpack.jobdata)
  File "/Users/xxx/Development/project/env/lib/python2.7/re.py", line 229, in _compile
NameError: ("global name '_cache' is not defined", <function _compile at 0x14d9b90>, ('(?P<option>[^:=\\s][^:=]*)\\s*(?P<vi>[:=])\\s*(?P<value>.*)$', 0))

2015/01/21 19:00:26 master     WARN: Job killed
Status: [map] 1 waiting, 0 running, 0 done, 1 failed

It seems that Disco is trying to pickle the function _compile from the re.py module. I did some debugging and figured out the method dPickle.save_func() is not correctly detecting re._compile as a standard library function when virtualenv is activated.

I fixed the problem patching the dPickle.is_std() method as follows:

import re as env_module
import pickle as sys_module

def is_std(module, stdlib=(dirname(getfile(sys_module)), dirname(getfile(env_module)),)):
    return module.__name__ != '__main__' and dirname(getfile(module)) in stdlib

This is kind of ugly and not very robust solution in my opinion. I would appreciate your input on how it could be improved.

Thanks,

pooya commented 9 years ago

Hey @trein Do you still see this problem if you cherry-pick commit 06cb9e00afa9870eca3c034335aadf6eb27f6040?

trein commented 9 years ago

@pooya Just tested this morning. I'm still getting the same error. The function _compile still being considered as non STD.