frictionlessdata / datapackage-pipelines

Framework for processing data packages in pipelines of modular components.
https://frictionlessdata.io/
MIT License
119 stars 32 forks source link

Output pipe disappeared! when trying to load data with flows #150

Open zelima opened 6 years ago

zelima commented 6 years ago

In order to submit an issue, please ensure you can check the following. Thanks!

I have a piece of code with data flows that works fine if I execute it directly with python my_flow.py

# my_flow.py
from dataflows import Flow,  add_metadata, dump_to_path, load, printer

Flow(
    add_metadata(name="finance-vix"),
    load(
        load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
        headers=2
    ), printer(), dump_to_path(),
).process()

# Outout 
vixcurrent:
#     Date          VIX Open    VIX High     VIX Low    VIX Close
      (string)      (number)    (number)    (number)     (number)
----  ----------  ----------  ----------  ----------  -----------
1     1/2/2004         17.96       18.68       17.54        18.22
2     1/5/2004         18.45       18.49       17.44        17.49
3     1/6/2004         17.66       17.67       16.19        16.73
4     1/7/2004         16.72       16.75       15.5         15.5
5     1/8/2004         15.42       15.68       15.32        15.61
6     1/9/2004         16.15       16.88       15.57        16.75

However, if I wrap it inside flow() function and try and run pipelines via dpp run ./my-pipleine or run inside docker container via dpp server it fails silently, without showing me errors except saying Output pipe disappeared!.

# modified my_flow.py
from dataflows import Flow,  add_metadata, dump_to_path, load, printer

def flow(parameters, datapackage, resources, stats):
    return Flow(
        add_metadata(name="finance-vix"),
        load(
            load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
            headers=2
        ), printer(), dump_to_path()
    )

my pipeline-spec.yaml:

finance-vix-flow:
  pipeline:
    - flow: my_flow

my Docekrfile:

FROM frictionlessdata/datapackage-pipelines:latest

RUN apk --update --no-cache add libpq postgresql-dev libffi libffi-dev build-base python3-dev ca-certificates
RUN update-ca-certificates

WORKDIR /app
RUN apk add --update postgresql-client

ADD requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt

ADD . /app

CMD ["server"]

Error log on server:

(sink): >>> PROCESSED ROWS: 0
flow: DEBUG   :Starting new HTTP connection (1): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
flow: DEBUG   :Starting new HTTP connection (2): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
flow: DEBUG   :Starting new HTTP connection (1): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
flow: DEBUG   :Starting new HTTP connection (2): www.cboe.com:80
flow: DEBUG   :http://www.cboe.com:80 "GET /publish/ScheduledTask/MktData/datahouse/vixcurrent.csv HTTP/1.1" 200 135448
(sink): >>> PROCESSED ROWS: 0
flow: ERROR   :Output pipe disappeared!

Error log via dpp run ./finance-vix-flow

./finance-vix-flow: FAILURE, processed 0 rows
INFO    :RESULTS:
INFO    :FAILURE: ./finance-vix-flow 
ERROR log from processor flow:
+--------
| ERROR   :Output pipe disappeared!
+--------
OriHoch commented 6 years ago

the implementation is still a bit flaky, hopefully it will be improved in dpp v2

there are 3 possible problems with your code (or rather, with dpp..):

  1. need to set dpp:streaming: True on the resource
  2. stdout redirect is still buggy, so better not to use the printer
  3. not sure if required, but better to load the input datapackage and resources from the flow

fixed implementation (haven't tested):

# modified my_flow.py
from dataflows import Flow,  add_metadata, dump_to_path, load, printer, update_resources

def flow(parameters, datapackage, resources, stats):
    return Flow(
        load((datapackage, resources)),
        add_metadata(name="finance-vix"),
        load(
            load_source='http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv',
            headers=2
        ), 
        update_resource('vixcurrent', **{'dpp:streaming': True}),
        dump_to_path()
    )

if __name__ == '__main__':
    Flow(flow({}, {'resources': []}, [], {}), printer()).process()
akariv commented 6 years ago

This is true for version 1.7.2 Version 2.0.0 has introduced some modification, including:

@zelima does this reproduce in v2.0.0?

zelima commented 6 years ago

@zelima yes, by the time issue was open, this was happening in v2.0.0 as well. Though setting dpp:streaming: True helped both of them