datahq / dataflows

DataFlows is a simple, intuitive lightweight framework for building data processing flows in python.
https://dataflows.org
MIT License
193 stars 39 forks source link

Pure streaming mode for handling very big files #139

Closed roll closed 4 years ago

roll commented 4 years ago

Overview

For now, the base processor collects all the data into the results return var:

dataflows.base.datastream_processor

    def results(self, on_error=None):
        try:
            ds = self._process()
            results = [
                list(schema_validator(res.res, res, on_error=on_error))
                for res in ds.res_iter
            ]
        except Exception as exception:
            self.raise_exception(exception)
        return results, ds.dp, ds.merge_stats()

I've tested an alternative version of this snippet which iterates but ignores rows and it seems to be working for simple cases like load -> dump_to_path. Hopefully collecting data is not vital for the framework and we can introduce pure stream mode option.

WDYT @akariv @cschloer

Initial discussion:

roll commented 4 years ago

BTW,

I found flow.process in the code (not sure it's documented in the readme)

@cschloer What if you try using it instead of flow.results?

akariv commented 4 years ago

Flow().results() returns all the data processed so obviously it will take a lot of memory.

To avoid that you should use Flow().process() instead.

The documentation mentions that but is not very explicit I now see

What about large data files? In the above examples, the results are loaded into memory, which is not always preferable or acceptable. In many cases, we'd like to store the results directly onto a hard drive - without having the machine's RAM limit in any way the amount of data we can process.

I think this is an indication that documentation should be improved...