mohaseeb / beam-nuggets

Collection of transforms for the Apache beam python SDK.
http://mohaseeb.com/beam-nuggets/
MIT License
87 stars 38 forks source link

AttributeError: 'OracleDialect_cx_oracle' object has no attribute 'default_schema_name' #29

Open hanknac opened 4 years ago

hanknac commented 4 years ago

I am trying to use beam-nuggets to access Oracle, and I am receiving:

AttributeError: 'OracleDialect_cx_oracle' object has no attribute 'default_schema_name'

Code:

import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db

with beam.Pipeline(options=PipelineOptions()) as p: source_config = relational_db.SourceConfiguration( drivername='oracle+cx_oracle', host='...', port=1521, username='', password='' ) records = p | "Reading records from db" >> relational_db.ReadFromDB( source_config=source_config, table_name = 'CONTACTS', query='select * from EHR.CONTACTS' # optional. When omitted, all table records are returned. ) records | 'Writing to stdout' >> beam.Map(print)

Stack trace:

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db.py in process(self, element) 94 try: 95 if query: ---> 96 for record in db.query(table_name, query): 97 yield record 98 else:

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in query(self, table_name, query) 267 268 def query(self, table_name, query): --> 269 table = self._open_table_for_read(table_name) 270 for record in table.query_records(self._session, query): 271 yield record

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in _open_table_for_read(self, name) 302 return self._open_table( 303 name=name, --> 304 get_table_f=load_table 305 ) 306

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in _open_table(self, name, get_table_f, get_table_f_params) 317 if not table: 318 self._name_to_table[name] = ( --> 319 self._get_table(name, get_table_f, get_table_f_params) 320 ) 321 table = self._name_to_table[name]

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in _get_table(self, name, get_table_f, get_table_f_params) 323 324 def _get_table(self, name, get_table_f, get_table_f_params): --> 325 table_class = get_table_f(self._session, name, **get_table_f_params) 326 if table_class: 327 table = _Table(table_class=table_class, name=name)

/opt/conda/lib/python3.7/site-packages/beam_nuggets/io/relational_db_api.py in load_table(session, name) 378 table_class = None 379 engine = session.bind --> 380 if engine.dialect.has_table(engine, name): 381 metadata = MetaData(bind=engine) 382 table_class = create_table_class(Table(name, metadata, autoload=True))

/opt/conda/lib/python3.7/site-packages/sqlalchemy/dialects/oracle/base.py in has_table(self, connection, table_name, schema) 1356 def has_table(self, connection, table_name, schema=None): 1357 if not schema: -> 1358 schema = self.default_schema_name 1359 cursor = connection.execute( 1360 sql.text(

AttributeError: 'OracleDialect_cx_oracle' object has no attribute 'default_schema_name'

mohaseeb commented 4 years ago

Hi @hanknac I didn't have time to look at this, but I can see that you are using Python 3; can you try with Python 2? so far I've used beam-nuggets with Python 2. I'm planning to update it for python 3 soon.

ferreira-guilherme commented 4 years ago

@hanknac same problem here. Were you able to solve the problem?

@mohaseeb I'm using Python 2 and mssql dialect (driver pyodbc). On relational_db_api.load_table() you try to check if the table exists "engine.dialect.has_table(engine, name)", but inside this SqlAchemy method the property default_schema_name of dialect is used, but in this moment, the property does not exist.

To "resolve quickly" I did the following:

in the relational_db file, inside class "_ReadFromRelationalDBFn" I added this line after db.start_session():

db._session.bind.connect()

rodalgon commented 4 years ago

I'm facing the same problem as @ferreira-guilherme, but in my use case, i'm using a SQLServer Dialect:

from future import print_function import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from beam_nuggets.io import relational_db

def run(argv=None, save_main_session=True): with beam.Pipeline(options=PipelineOptions()) as p: source_config = relational_db.SourceConfiguration( drivername='mssql+pymssql', host='localhost', port=1433, username='...', password='...', database='...' ) records = p | "Reading records from db" >> relational_db.ReadFromDB( source_config=source_config, table_name='XXX', query='select YYYYYY from DM_XXX' # optional. When omitted, all table records are returned. ) records | 'Writing to stdout' >> beam.Map(print)

if name == 'main':

logging.getLogger().setLevel(logging.INFO)

run()

Result:

File "/home/rodrigo/projects/liq/workspace/dataflow/template_teste/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 325, in _get_table table_class = get_table_f(self._session, name, **get_table_f_params) File "/home/rodrigo/projects/liq/workspace/dataflow/template_teste/lib/python3.6/site-packages/beam_nuggets/io/relational_db_api.py", line 380, in load_table if engine.dialect.has_table(engine, name): File "/home/rodrigo/projects/liq/workspace/dataflow/template_teste/lib/python3.6/site-packages/sqlalchemy/dialects/mssql/base.py", line 2188, in wrap dbname, owner = _owner_plus_db(dialect, schema) File "/home/rodrigo/projects/liq/workspace/dataflow/template_teste/lib/python3.6/site-packages/sqlalchemy/dialects/mssql/base.py", line 2227, in _owner_plus_db return None, dialect.default_schema_name AttributeError: 'MSDialect_pymssql' object has no attribute 'default_schema_name' [while running 'Reading records from db/ParDo(_ReadFromRelationalDBFn)']

I've tried your solution @ferreira-guilherme and worked for me. Thanks!