datahq / dataflows

DataFlows is a simple, intuitive lightweight framework for building data processing flows in python.
https://dataflows.org
MIT License
194 stars 39 forks source link

Checkpoints don't use schema for formatting #59

Closed micimize closed 5 years ago

micimize commented 5 years ago

It looks like the checkpoint formatters uses static unconfigurable serializers.

This creates serious issues - even if the datetime formatter was timezone aware, it would then be adding timezone data to naive datetimes

akariv commented 5 years ago

I agree this is an issue - timezones are not handled properly. Thanks for reporting and I'll fix this asap.

On Sat, Jan 12, 2019, 03:55 Michael Joseph Rosenthal < notifications@github.com> wrote:

It looks like the checkpoint formatters uses static unconfigurable serializers https://github.com/datahq/dataflows/blob/master/dataflows/processors/dumpers/file_formats.py#L50 .

This creates serious issues - even if the datetime formatter was timezone aware, it would then be adding timezone data to naive datetimes

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/datahq/dataflows/issues/59, or mute the thread https://github.com/notifications/unsubscribe-auth/AAQMdbGA8LB_T-S9E9ioN2bTML4RF-4Bks5vCUCagaJpZM4Z8bnS .

akariv commented 5 years ago

Fixed in v0.0.38

micimize commented 5 years ago

Wait, even if you replaced the mechanism for checkpointing, don't all the dumpers still lose timezone info? Can we not leverage the datapackage.save?

As an aside, the util I came up for for (I think) the equivalent of stream/unstream used jsonpickle:

import tempfile, os
import jsonpickle as json

class LDJsonTemporaryFile(object):
    """
    Use a line delimited tempfile for caching and reading back an arbitrary stream of data.
    * will automatically delete the temporary file on close and __exit__
    * Used by `partition`
    """

    def __init__(self):
        self.file = tempfile.TemporaryFile(mode="r+")

    def append(self, obj):
        self.file.write(json.dumps(obj) + "\n")

    def __iter__(self):
        self.file.seek(0)
        for line in self.file:
            yield json.loads(line)

    def __enter__(self):
        return self

    def close(self):
        self.file.close()
        del self.file

    def __exit__(self, *exc_info):
        self.close()