Open coriolinus opened 7 years ago
Hi @coriolinus, could you post the relevant parts of your etl.py script?
Sure. By necessity, these are excerpts; they're verbose enough as it is.
Lines 108-167:
def load_table(table, human_readable, output_tablename, load_override=load, progress=100):
if load_override:
with timed_op():
print("Loading {} into {}...".format(human_readable, output_tablename))
table = table.progress(progress)
table.todb(connect_dest(auth, destination), tablename=output_tablename)
@contextmanager
def mini_etl(human_readable, query,
output_tablename,
transform_func,
tf_args=[], tf_kwargs={},
progress=100,
use_table=None,
load_override=load,
show_override=show_tables):
"""
Perform a batch of ETL operations for a single PETL table
This should reduce the amount of code duplication necessary for
standard table migration.
query: raw SQL query which populates the initial table state
output_tablename: name of the destination table
transform_func: This function should transform the input PETL table into
the output PETL table. Called like `transform_func(table, *tf_args, **tf_kwargs)`
tf_args, tf_kwargs: Additional positional and keyword arguments for the transform func.
progress: batch size when loading the output table. Default 100.
use_table: if not None, ignore query and instead use this PETL table as input
load_override: override whether to load this table
show_override: override whether to show this table
This is a context manager so that you have access to the table for additional
operations, if necessary. transform_func runs before the table is returned.
"""
def prepare_table():
if use_table is not None:
table = use_table
else:
with timed_op("Extracting {}...".format(human_readable)):
table = petl.fromdb(connect_source(auth), query)
with timed_op("Preparing to transform {}...".format(human_readable)):
table = transform_func(table, *tf_args, **tf_kwargs)
return table
table = prepare_table()
contained_table = MutableContainer(table)
yield contained_table
table = contained_table.item
if show_override:
print("Head of {}:".format(human_readable))
print(table.head())
try:
load_table(table, human_readable, output_tablename, load_override, progress)
except pymysql.err.OperationalError:
print("! Lost connection to source db; reconnecting...")
table = prepare_table()
Lines 273-276:
with mini_etl('work table', '', linked_table.WORK_OUT_TABLE,
linked_table.create, [connect_dest(auth, destination), 'work', user_ids],
use_table=normal_users):
pass
Lines 379-400:
if __name__ == '__main__':
auth = getauth()
destinations = list(auth['destination'].keys())
import argparse
parser = argparse.ArgumentParser(
description='Extract, Transform, Load data from old db format to new')
parser.add_argument('-s', '--show-tables', action='store_true',
help='Display the heads of various tables as they are processed.')
parser.add_argument('-l', '--load', action='store_true',
help='Actually load the transformed data into the destination database. '
'Note that this eliminates any data currently stored there!')
parser.add_argument('--no-timer', action='store_false', dest='timer',
help='Disable the automatic timing of these actions.')
parser.add_argument('-d', '--destination', choices=destinations, default='default',
help='Choose the destination database into which to load the transformed '
'data. Sourced from the sub-keys of the `destination.XXX` tables '
'of `auth.toml`.')
args = parser.parse_args()
etl(args.destination, args.show_tables, args.load, args.timer)
Sorry, nothing obvious jumps out.
Yes, if anything obvious had jumped out, I would have attempted a solution. I'd hoped that possibly the stack trace would be useful to someone knowledgeable about PETL internals.
I guess the fundamental question here is, if I simply catch the error, is there any way to hand PETL a new connection object to the source db and try the .todb()
function again, without needing to go back and re-do all the intermediate transforms? If so, I can work around this. If not, this becomes much harder to solve.
Petl never calls close() on a connection object. So if the underlying error is caused by trying to read from a closed connection, and you are indeed passing a DB-API connection object to fromdb(), then it is not obvious how that could happen. I would need a minimal reproducible example to go any further.
You can replace the connection object on a DbView instance (returned by fromdb()), e.g.:
t = etl.fromdb(connection, ...) t.dbo = another_connection
...however I would recommend trying to understand the cause of the error first.
Hth.
On Friday, May 5, 2017, coriolinus notifications@github.com wrote:
Yes, if anything obvious had jumped out, I would have attempted a solution. I'd hoped that possibly the stack trace would be useful to someone knowledgeable about PETL internals.
I guess the fundamental question here is, if I simply catch the error, is there any way to hand PETL a new connection object to the source db and try the .todb() function again, without needing to go back and re-do all the intermediate transforms? If so, I can work around this. If not, this becomes much harder to solve.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/alimanfoo/petl/issues/422#issuecomment-299394098, or mute the thread https://github.com/notifications/unsubscribe-auth/AAq8QiN_-z6sN7Aid_Ac3CmO3bLlKkilks5r2sUCgaJpZM4NQuo- .
-- Alistair Miles Head of Epidemiological Informatics Centre for Genomics and Global Health http://cggh.org Big Data Institute Building Old Road Campus Roosevelt Drive Oxford OX3 7LF United Kingdom Email: alimanfoo@googlemail.com Web: http://a http://purl.org/net/alimanlimanfoo.github.io/ Twitter: https://twitter.com/alimanfoo
Hi I am getting the same error:
pymysql.err.InterfaceError: (0, '')
code:
import petl
import pymysql
query = 'select * from columns where table_schema in ("employees");'
connection = pymysql.connect(host='localhost', port=3306, user='root', password='test', db='information_schema')
table = petl.fromdb(connection, query)
new_table = petl.cut(table, 'TABLE_SCHEMA')
print(new_table)
error:
pymysql.err.InterfaceError: (0, '')
I am getting the same error,when i get all rows about 420000:
In [34]: table
Traceback (most recent call last):
File "d:\ProgramData\Anaconda3\lib\site-packages\IPython\core\formatters.py", line 702, in __call__
printer.pretty(obj)
File "d:\ProgramData\Anaconda3\lib\site-packages\IPython\lib\pretty.py", line 405, in pretty
return _repr_pprint(obj, self, cycle)
File "d:\ProgramData\Anaconda3\lib\site-packages\IPython\lib\pretty.py", line 695, in _repr_pprint
output = repr(obj)
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\util\vis.py", line 135, in _table_repr
return str(look(table))
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\util\vis.py", line 104, in __repr__
table, overflow = _vis_overflow(self.table, self.limit)
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\util\vis.py", line 528, in _vis_overflow
table = list(islice(table, 0, limit+2))
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\io\db.py", line 156, in _iter_dbapi_connection
for row in _iter_dbapi_cursor(cursor, query, *args, **kwargs):
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\io\db.py", line 163, in _iter_dbapi_cursor
cursor.execute(query, *args, **kwargs)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\cursors.py", line 170, in execute
result = self._query(query)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\cursors.py", line 328, in _query
conn.query(q)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\connections.py", line 517, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\connections.py", line 732, in _read_query_result
result.read()
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\connections.py", line 1075, in read
first_packet = self.connection._read_packet()
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\connections.py", line 671, in _read_packet
% (packet_number, self._next_seq_id))
InternalError: Packet sequence number wrong - got 139 expected 1
Traceback (most recent call last):
File "d:\ProgramData\Anaconda3\lib\site-packages\IPython\core\formatters.py", line 345, in __call__
return method()
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\util\vis.py", line 549, in _display_html
table, overflow = _vis_overflow(table, limit)
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\util\vis.py", line 528, in _vis_overflow
table = list(islice(table, 0, limit+2))
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\io\db.py", line 156, in _iter_dbapi_connection
for row in _iter_dbapi_cursor(cursor, query, *args, **kwargs):
File "d:\ProgramData\Anaconda3\lib\site-packages\petl\io\db.py", line 163, in _iter_dbapi_cursor
cursor.execute(query, *args, **kwargs)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\cursors.py", line 170, in execute
result = self._query(query)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\cursors.py", line 328, in _query
conn.query(q)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\connections.py", line 516, in query
self._execute_command(COMMAND.COM_QUERY, sql)
File "d:\ProgramData\Anaconda3\lib\site-packages\pymysql\connections.py", line 750, in _execute_command
raise err.InterfaceError("(0, '')")
InterfaceError: (0, '')
_
but when i limit the rows to 1000 or
df=etl.todataframe(table)
the error gone
This error is apparently caused by attempted use of a connection or cursor which has already been closed, per StackOverflow. My code doesn't handle any cursors at all; all DB access is managed through PETL, and a new connection is created each time
.todb()
or.fromdb()
is called.Source DB is RDS/Aurora; target is sometimes sqlite3, sometimes RDS/Postgres. That the error comes via pymysql indicates that the previously-closed DB is the source DB, not the target.
Full stack trace follows: