mohaseeb / beam-nuggets

Collection of transforms for the Apache beam python SDK.
http://mohaseeb.com/beam-nuggets/
MIT License
87 stars 38 forks source link

AttributeError: 'Read' object has no attribute 'source' #19

Closed pebcakerror closed 5 years ago

pebcakerror commented 5 years ago

New to python and new to dataflow....

I'm at a loss what I'm missing. Is this potentially an error with beam-nuggets?

$ python run_pipeline.py --requirements_file requirements.txt --runner DataflowRunner --staging_location [redacted]/staging --temp_location [redacted]/temp --template_location [redacted]/template --project [redacted]
Traceback (most recent call last):
  File "run_pipeline.py", line 73, in <module>
    run()
  File "run_pipeline.py", line 58, in run
    table_name=rdb_table_name,
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 887, in __ror__
    return self.transform.__ror__(pvalueish, self.label)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/transforms/ptransform.py", line 510, in __ror__
    result = p.apply(self, pvalueish, label)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 480, in apply
    return self.apply(transform, pvalueish)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/pipeline.py", line 516, in apply
    pvalueish_result = self.runner.apply(transform, pvalueish, self._options)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 193, in apply
    return m(transform, input, options)
  File "/[redacted]/.local/lib/python2.7/site-packages/apache_beam/runners/dataflow/dataflow_runner.py", line 845, in apply_Read
    if hasattr(transform.source, 'format'):
AttributeError: 'Read' object has no attribute 'source'
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions, SetupOptions
from beam_nuggets.io import relational_db

PROJECT = ""

class CopyRDBToBQOptions(PipelineOptions):
    """
    Runtime Parameters given during template execution
    """
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
            '--tablename',
            type=str,
            help='Table Name'
        )

def run():
    """
    Pipeline entry point, runs the all the necessary processes
    - Read from Relational DB
    - Save dict to text file in a Storage Bucket
    - Commit to BigQuery
    """

    # Retrieve project Id and append to PROJECT form GoogleCloudOptions
    global PROJECT

    # Initialize runtime parameters as object
    rdbtobq_options = PipelineOptions().view_as(CopyRDBToBQOptions)

    pipeline_options = PipelineOptions()
    # Save main session state so pickled functions and classes
    # defined in __main__ can be unpickled
    pipeline_options.view_as(SetupOptions).save_main_session = True

    source_config = relational_db.SourceConfiguration(
        drivername='postgresql+pg8000',
        host='[redacted]',
        port=0000,
        username='[redacted]',
        password='[redacted]',
        database='[redacted]',
    )
    rdb_table_name=rdbtobq_options.tablename
    bq_table = '[redacted].{}'.format(rdbtobq_options.tablename)

    # Beginning of the pipeline
    p = beam.Pipeline(options=pipeline_options)

    records = p | "Reading records from db" >> relational_db.Read(
        source_config=source_config,
        table_name=rdb_table_name,
    )

    records | beam.io.gcp.bigquery.WriteToBigQuery(
        bq_table,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
        method='FILE_LOADS'
    )

if __name__ == '__main__':
    run()
# requirements.txt
apache-beam[gcp]
google-cloud
google-cloud-bigquery
google-cloud-dataflow
google-cloud-storage
beam-nuggets
pebcakerror commented 5 years ago

I was able to get this to run by downgrading Beam to 2.5.0. Beam Nuggets seems to indicate its compatible with the latest. https://github.com/mohaseeb/beam-nuggets/blob/55441d9fd9b9e02ebe1c3434780472e1a7bf03d2/setup.py#L8

2514millerj commented 5 years ago

I have sporadically ran into this issue before. My solution was to use a local version of beam-nuggets and change Read to some something else like DBRead. I think it is a dataflow specific issue and I am not sure this is a real fix so I have not submitted a pull request for it

pebcakerror commented 5 years ago

I was able to solve this by switching to the model shown in https://github.com/jamesmoore255/dataflow_template

For beam_nuggets it seemed important to use with

with beam.Pipeline(options=options) as p:

I did have to downgrade Beam to 2.5

It was also important to understand in beam.ParDo there is no access to globals https://cloud.google.com/dataflow/docs/resources/faq#how-do-i-handle-nameerro

If you're getting a NameError when you execute your pipeline using the Cloud Dataflow service but not when you execute locally (i.e. using the DirectRunner), your DoFns may be using values in the global namespace that are not available on the Cloud Dataflow worker.

By default, global imports, functions, and variables defined in the main session are not saved during the serialization of a Cloud Dataflow job. If, for example, your DoFns are defined in the main file and reference imports and functions in the global namespace, you can set the --save_main_session pipeline option to True. This will cause the state of the global namespace to be pickled and loaded on the Cloud Dataflow worker.

Notice that if you have objects in your global namespace that cannot be pickled, you will get a pickling error. If the error is regarding a module that should be available in the Python distribution, you can solve this by importing the module locally, where it is used.

For example, instead of:

import re
…
def myfunc():
# use re module

use:

def myfunc():
import re
# use re module

Alternatively, if your DoFns span multiple files, you should use a different approach to packaging your workflow and managing dependencies.

mohaseeb commented 5 years ago

interesting; will have a look

mohaseeb commented 5 years ago

Hi @pebcakerror, I couldn't reproduce the problem when tried this job. I used the following as requirements to be installed on the workers (i.e. the requirements.txt)

apache-beam[gcp]
google-cloud
google-cloud-bigquery
google-cloud-dataflow
google-cloud-storage
beam-nuggets

This would install apache-beam 2.13.0 (the latest at the moment) on the workers. I also tried the example with and without with beam.Pipeline(options=options) as p:.

Could this issue be related to an earlier apachem-beam version? 2.13.0 was released 04/Jun.

mattwesthoff commented 5 years ago

Also getting this issue. @mohaseeb the job you linked only uses the relational_db.Write, not relational_db.Read. The error seems specific to Read needing a source attribute.

mohaseeb commented 5 years ago

@mattoraptor you are right; I missed that. There is a fix to this issue by @2514millerj that I'm going to merge shortly.

mohaseeb commented 5 years ago

The fix is uploaded to pypi (version 0.15.1). relational_db.Read is renamed to relational_db.ReadFromDB. See https://github.com/mohaseeb/beam-nuggets/pull/22#discussion_r303217640 for more info.

rafaelgildin commented 4 years ago

I'm developing a ETL processs, that creates a job in google cloud dataflow, when I read data from PostgreSQL and Write in Bigquery. When i run the code, i receive the message No module named 'beam_nuggets' . I've already try to reinstall the package beam-nuggets in my local machine and in the gcp console, but the issue continues. Does anyone know how can i fix it ?