d6t / d6tflow

Python library for building highly effective data science workflows
https://d6tflow.readthedocs.io/en/latest/
MIT License
951 stars 77 forks source link

Fail to run the example #12

Closed elypma closed 5 years ago

elypma commented 5 years ago

I am trying to run the d6tflow example (unmodified):

class TaskGetData(d6tflow.tasks.TaskPqPandas):  # save dataframe as parquet

    def run(self):
        iris = sklearn.datasets.load_iris()
        df_train = pd.DataFrame(iris.data, columns=['feature{}'.format(i) for i in range(4)])
        df_train['y'] = iris.target
        self.save(df_train)  # quickly save dataframe

class TaskPreprocess(d6tflow.tasks.TaskCachePandas):  # save data in memory
    do_preprocess = luigi.BoolParameter(default=True)  # parameter for preprocessing yes/no

    def requires(self):
        return TaskGetData()  # define dependency

    def run(self):
        df_train = self.input().load()  # quickly load required data
        if self.do_preprocess:
            df_train.iloc[:, :-1] = sklearn.preprocessing.scale(df_train.iloc[:, :-1])
        self.save(df_train)

class TaskTrain(d6tflow.tasks.TaskPickle):  # save output as pickle
    do_preprocess = luigi.BoolParameter(default=True)

    def requires(self):
        return TaskPreprocess(do_preprocess=self.do_preprocess)

    def run(self):
        df_train = self.input().load()
        model = sklearn.svm.SVC()
        model.fit(df_train.iloc[:, :-1], df_train['y'])
        self.save(model)

if __name__ == "__main__":
    d6tflow.run(TaskTrain()) 

but this fails at the last line: self.save(model):

Welcome to d6tflow!
d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\parameter.py:284: UserWarning: Parameter "task_process_context" with value "None" is not of type string.
  warnings.warn('Parameter "{}" with value "{}" is not of type string.'.format(param_name, param_value))
INFO: Informed scheduler that task   TaskTrain_True_e00389f8b2   has status   PENDING
INFO: Informed scheduler that task   TaskPreprocess_True_e00389f8b2   has status   PENDING
INFO: Informed scheduler that task   TaskGetData__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) running   TaskPreprocess(do_preprocess=True)
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) done      TaskPreprocess(do_preprocess=True)
INFO: Informed scheduler that task   TaskPreprocess_True_e00389f8b2   has status   DONE
INFO: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) running   TaskTrain(do_preprocess=True)
ERROR: [pid 12304] Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) failed    TaskTrain(do_preprocess=True)
Traceback (most recent call last):
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\worker.py", line 199, in run
    new_deps = self._run_get_new_deps()
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\luigi\worker.py", line 139, in _run_get_new_deps
    task_gen = self.task.run()
  File "d:\dev\py\luigi\d6t_example_2.py", line 44, in run
    self.save(model)
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\tasks\__init__.py", line 135, in save
    self.output().save(data)
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\targets\__init__.py", line 288, in save
    pickle.dump(obj, open(self.path, "wb"), **kwargs)
TypeError: invalid file: WindowsPath('data/TaskTrain/TaskTrain_True_e00389f8b2-data.pkl')
INFO: Informed scheduler that task   TaskTrain_True_e00389f8b2   has status   FAILED
INFO: Worker Worker(salt=962715490, workers=1, host=FataMorgana, username=User, pid=12304) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 3 tasks of which:
* 1 complete ones were encountered:
    - 1 TaskGetData()
* 1 ran successfully:
    - 1 TaskPreprocess(do_preprocess=True)
* 1 failed:
    - 1 TaskTrain(do_preprocess=True)

This progress looks :( because there were failed tasks

===== Luigi Execution Summary =====

Traceback (most recent call last):
  File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\ptvsd_launcher.py", line 43, in <module>
    main(ptvsdArgs)
  File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\lib\python\ptvsd\__main__.py", line 434, in main
    run()
  File "c:\Users\User\.vscode\extensions\ms-python.python-2019.6.24221\pythonFiles\lib\python\ptvsd\__main__.py", line 312, in run_file
    runpy.run_path(target, run_name='__main__')
  File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 263, in run_path
    pkg_name=pkg_name, script_name=fname)
  File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 96, in _run_module_code
    mod_name, mod_spec, pkg_name, script_name)
  File "d:\apps\miniconda3\envs\py35\lib\runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "d:\dev\py\luigi\d6t_example_2.py", line 48, in <module>
    d6tflow.run(TaskTrain())  # , local_scheduler=False)  # single task
  File "d:\apps\miniconda3\envs\py35\lib\site-packages\d6tflow\__init__.py", line 101, in run
    raise RuntimeError('Exception found running flow, check trace')
RuntimeError: Exception found running flow, check trace

For some reason it cannot create the file. The 'data' directory is created, it contains 2 subdirectories: TaskGetData (containing a pq file as expected) and a TaskTrain directory (which is empty).

I am running on windows 10, and I am using python 3.5.

Any idea what might be wrong?

elypma commented 5 years ago

After some debugging it appeared that both JsonTarget and PickleTarget have to be adopted for the use of pathlib - and then the opening of files for reading and writing is done using the member function open of the path variable.

class JsonTarget(DataTarget):
    """
    Saves to json, loads to dict

    """
    def load(self, cached=False, **kwargs):
        """
        Load from json to dict

        Args:
            cached (bool): keep data cached in memory
            **kwargs: arguments to pass to json.load

        Returns: dict

        """
        def read_json(path, **opts):
            with path.open('r') as fhandle:
                df = json.load(fhandle)
            return df['data']
        return super().load(read_json, cached, **kwargs)

    def save(self, dict_, **kwargs):
        """
        Save dict to json

        Args:
            dict_ (dict): python dict
            kwargs : additional arguments to pass to json.dump

        Returns: filename

        """
        def write_json(path, _dict_, **opts):
            with path.open('w') as fhandle:
                json.dump(_dict_, fhandle, **opts)
        (self.path).parent.mkdir(parents=True, exist_ok=True)
        opts = {**{'indent':4},**kwargs}
        write_json(self.path, {'data':dict_}, **opts)
        return self.path

class PickleTarget(DataTarget):
    """
    Saves to pickle, loads to python obj

    """
    def load(self, cached=False, **kwargs):
        """
        Load from pickle to obj

        Args:
            cached (bool): keep data cached in memory
            **kwargs: arguments to pass to pickle.load

        Returns: dict

        """
        return super().load(lambda x: pickle.load(x.open("rb")), cached, **kwargs)

    def save(self, obj, **kwargs):
        """
        Save obj to pickle

        Args:
            obj (obj): python object
            kwargs : additional arguments to pass to pickle.dump

        Returns: filename

        """
        (self.path).parent.mkdir(parents=True, exist_ok=True)
        pickle.dump(obj, self.path.open("wb"), **kwargs)
        return self.path