datahq / dataflows

DataFlows is a simple, intuitive lightweight framework for building data processing flows in python.
https://dataflows.org
MIT License
194 stars 39 forks source link

(bad) bug in processor duplicate #127

Closed cschloer closed 4 years ago

cschloer commented 4 years ago

The duplicate processor does not make a copy of the dictionary from the original resource for the new resource, meaning it still has the same pointers and any changes to the dictionary in future processors will change it for both resources.

The line is here.

The suggested fix is changing:

        yield row

to:

         yield dict((k, v) for k, v in row.items())

It's harder to reproduce with the standard processors (although it's impacts the standard processor find_replace), see https://github.com/BCODMO/bcodmo_processors/tree/lw-638-flows branch for imports for reproducing:

from bcodmo_processors.bcodmo_pipeline_processors import boolean_filter_rows

from datapackage_pipelines.lib import (
    load as standard_load,
    duplicate as standard_duplicate,
    find_replace as standard_find_replace,
)

from dataflows import Flow

l = standard_load.flow(
    {
        "from": "/home/conrad/Projects/whoi/laminar/datasets/large/BCODMO_Formatted.csv",
        "name": "res",
        "limit_rows": 5,
        "format": "csv",
    }
)
d = standard_duplicate.flow({"source": "res", "target-name": "res_new"})
bf = boolean_filter_rows(
    {"boolean_statement": "{Temperature} < 29", "resources": ["res_new"]}
)

fr = standard_find_replace.flow(
    {
        "fields": [
            {"name": "Temperature", "patterns": [{"find": ".*", "replace": "20",},],}
        ],
        "resources": ["res"],
    }
)

flow = Flow(
    *[
        l,
        d,
        bf,
        fr,
        #
    ]
)
flow.results()

errors out with:

TypeError: '<' not supported between instances of 'str' and 'decimal.Decimal' at row 1

In the example above, the find_replace flow changes the Temperature field in the resource "res" to be the string "20" rather than a number. The boolean_filter_rows flow, even though it is running before the find_replace processor AND using the different resource "res_new", runs into issues because it can no longer compare the string "20" with 29 (it is expecting a number).

akariv commented 4 years ago

@cschloer I couldn't reproduce this. The line you linked to is only called once per row (as it's the saver function, while the loader yields the copy. The KVFile doesn't cache the objects and each object is serialized and deserialized from json so that it simply can't be the same object.

This run shows it's working as expected -

$ python
Python 3.7.0 (default, Oct 15 2018, 10:12:10) 
[Clang 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import dataflows as DF
>>> data=[{'field': i} for i in range(5)]
>>> DF.Flow(
  data,
  DF.duplicate(),
  DF.add_field('column', 'string', 'value', resources=-1),
  DF.printer()
).process()
res_1:
  #        field
       (integer)
---  -----------
  1            0
  2            1
  3            2
  4            3
  5            4
res_1_copy:
  #        field  column
       (integer)  (string)
---  -----------  ----------
  1            0  value
  2            1  value
  3            2  value
  4            3  value
  5            4  value
(<datapackage.package.Package object at 0x10af92240>, {})

It's hard for me to tell what exactly is the problem in your case as I don't know exactly how the custom processors work. If you try to to remove the duplicate and the find-replace, does the flow work as intended?

cschloer commented 4 years ago

This is my bad. Just tried to recreate it again and I realized it was confusion with the default value of infer_strategy being "full" but the default value of cast_strategy being "do_nothing" led to some discrepancies of what I saw while the pipeline was running versus what I saw once the pipeline was complete. Sorry!