pandas-dev / pandas

Flexible and powerful data analysis / manipulation library for Python, providing labeled data structures similar to R data.frame objects, statistical functions, and much more
https://pandas.pydata.org
BSD 3-Clause "New" or "Revised" License
43.79k stars 17.98k forks source link

pd.read_sql_query with chunksize: pandasSQL_builder should only be called when first chunk is requested #19457

Open leezu opened 6 years ago

leezu commented 6 years ago

Code Sample

        pool = mp.Pool(processes=None, initializer=_initialize_mp_preprocess)
        for vals in pool.imap_unordered(self._preprocess_chunk,
                                        pd.read_sql_query(
                                            query,
                                            url,
                                            chunksize=nlp_chunksize)):

Problem description

Using pd.read_sql_query with chunksize, sqlite and with the multiprocessing module currently fails, as pandasSQL_builder is called on execution of pd.read_sql_query, but the multiprocessing module requests the chunks in a different Thread (and the generated sqlite connection only wants to be used in the thread where it was created so it throws an Exception.).

Workaround

Create a wrapper around pd.read_sql_query that only calls pd.read_sql_query once the first chunk is requested. Therefore pandasSQL_builder will be called within the Thread requesting the chunks.

@attr.s(auto_attribs=True)
class PDSQLQueryWrapper:
    """Wrap the iterator.

    To create the db engine in the thread that calls the iterator first.

    """

    _read_sql_query_iterator = None

    query: str
    url: str
    chunksize: int

    def __iter__(self):
        return self

    def __next__(self):
        if self._read_sql_query_iterator is None:
            self._read_sql_query_iterator = pd.read_sql_query(
                self.query, self.url, chunksize=self.chunksize)

        return next(self._read_sql_query_iterator)

I believe pandas should play nicely with the python multiprocessing module, delaying the creation of the sqlite connection and thereby fixing this issue.

TomAugspurger commented 6 years ago

Could you create a small reproducible example? And can you paste the current traceback?

but the multiprocessing module requests the chunks in a different Thread

You mean different process, right?

Do you have any interest in working on a fix?

leezu commented 6 years ago

Reproducible example:

import tempfile

import numpy as np
import multiprocessing as mp
import pandas as pd

# Find a temporary file to store the database
_, f = tempfile.mkstemp()

# Put some data in the database
url = f"sqlite://{f}"
df = pd.DataFrame.from_records(np.random.normal(size=(100, 100)))
df.to_sql(name="test", con=url)

# Retrieve via python mutliprocessing module
def process_chunk(chunk):
    pass

pool = mp.Pool()
query = "SELECT * FROM test"
for vals in pool.imap_unordered(process_chunk,
                                pd.read_sql_query(query, url, chunksize=2)):
    pass

You mean different process, right?

No, I mean that the multiprocessing module retrieves data from the iterator returned by pd.read_sql_query in a different Thread. Subsequently that data is of course passed to a different process, but this bug is concerned about not even getting so far ;)

Do you have any interest in working on a fix?

I'm interested but won't have time before end of February. So if someone else finds time to work on this I wouldn't mind. In the meantime the workaround detailed in the first post works.

leezu commented 6 years ago

And this is the traceback


/home/leonard/.local/lib64/python3.6/site-packages/matplotlib/colors.py:298: MatplotlibDeprecationWarning: The is_string_like function was deprecated in version 2.1.
  if cbook.is_string_like(arg):
Error closing cursor
Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/result.py", line 1159, in fetchmany
    l = self.process_rows(self._fetchmany_impl(size))
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/result.py", line 1076, in _fetchmany_impl
    return self.cursor.fetchmany(size)
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1288, in _safe_close_cursor
    cursor.close()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584
Exception during reset or similar
Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/result.py", line 1159, in fetchmany
    l = self.process_rows(self._fetchmany_impl(size))
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/result.py", line 1076, in _fetchmany_impl
    return self.cursor.fetchmany(size)
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 704, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1406, in _handle_dbapi_exception
    self._autorollback()
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 76, in __exit__
    compat.reraise(type_, value, traceback)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
    raise value
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1406, in _handle_dbapi_exception
    self._autorollback()
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 827, in _autorollback
    self._root._rollback_impl()
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 706, in _rollback_impl
    self._handle_dbapi_exception(e, None, None, None, None)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1334, in _handle_dbapi_exception
    exc_info
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 704, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584 (Background on this error at: http://sqlalche.me/e/f405)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/pool.py", line 703, in _finalize_fairy
    fairy._reset(pool)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/pool.py", line 873, in _reset
    pool._dialect.do_rollback(self)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584
Exception closing connection <sqlite3.Connection object at 0x7f6ed4b368f0>
Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/result.py", line 1159, in fetchmany
    l = self.process_rows(self._fetchmany_impl(size))
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/result.py", line 1076, in _fetchmany_impl
    return self.cursor.fetchmany(size)
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 704, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1406, in _handle_dbapi_exception
    self._autorollback()
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/langhelpers.py", line 76, in __exit__
    compat.reraise(type_, value, traceback)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 187, in reraise
    raise value
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1406, in _handle_dbapi_exception
    self._autorollback()
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 827, in _autorollback
    self._root._rollback_impl()
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 706, in _rollback_impl
    self._handle_dbapi_exception(e, None, None, None, None)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 1334, in _handle_dbapi_exception
    exc_info
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/util/compat.py", line 186, in reraise
    raise value.with_traceback(tb)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/base.py", line 704, in _rollback_impl
    self.engine.dialect.do_rollback(self.connection)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584 (Background on this error at: http://sqlalche.me/e/f405)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/pool.py", line 703, in _finalize_fairy
    fairy._reset(pool)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/pool.py", line 873, in _reset
    pool._dialect.do_rollback(self)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 457, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/pool.py", line 317, in _close_connection
    self._dialect.do_close(connection)
  File "/home/leonard/.local/lib64/python3.6/site-packages/sqlalchemy/engine/default.py", line 463, in do_close
    dbapi_connection.close()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584
Traceback (most recent call last):
  File "pandas-mp-sql.py", line 23, in <module>
    pd.read_sql_query(query, url, chunksize=2)):
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 735, in next
    raise value
sqlalchemy.exc.ProgrammingError: (sqlite3.ProgrammingError) SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140114398058240 and this is thread id 140114281891584 (Background on this error at: http://sqlalche.me/e/f405)
TomAugspurger commented 6 years ago

@leezu I get a different error with your example:

``` import tempfile import numpy as np import multiprocessing as mp import pandas as pd # Find a temporary file to store the database _, f = tempfile.mkstemp() # Put some data in the database url = f"sqlite://{f}" df = pd.DataFrame.from_records(np.random.normal(size=(100, 100))) df.to_sql(name="test", con=url) # Retrieve via python mutliprocessing module def process_chunk(chunk): pass pool = mp.Pool() query = "SELECT * FROM test" for vals in pool.imap_unordered(process_chunk, pd.read_sql_query(query, url, chunksize=2)): pass ## -- End pasted text -- --------------------------------------------------------------------------- OperationalError Traceback (most recent call last) ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 2146 try: -> 2147 return fn() 2148 except dialect.dbapi.Error as e: ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in connect(self) 386 if not self._use_threadlocal: --> 387 return _ConnectionFairy._checkout(self) 388 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in _checkout(cls, pool, threadconns, fairy) 765 if not fairy: --> 766 fairy = _ConnectionRecord.checkout(pool) 767 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in checkout(cls, pool) 515 def checkout(cls, pool): --> 516 rec = pool._do_get() 517 try: ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in _do_get(self) 1228 def _do_get(self): -> 1229 return self._create_connection() 1230 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in _create_connection(self) 332 --> 333 return _ConnectionRecord(self) 334 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in __init__(self, pool, connect) 460 if connect: --> 461 self.__connect(first_connect_check=True) 462 self.finalize_callback = deque() ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in __connect(self, first_connect_check) 650 self.starttime = time.time() --> 651 connection = pool._invoke_creator(self) 652 pool.logger.debug("Created new connection %r", connection) ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py in connect(connection_record) 104 return connection --> 105 return dialect.connect(*cargs, **cparams) 106 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams) 392 def connect(self, *cargs, **cparams): --> 393 return self.dbapi.connect(*cargs, **cparams) 394 OperationalError: unable to open database file The above exception was the direct cause of the following exception: OperationalError Traceback (most recent call last) in () 11 url = f"sqlite://{f}" 12 df = pd.DataFrame.from_records(np.random.normal(size=(100, 100))) ---> 13 df.to_sql(name="test", con=url) 14 15 ~/sandbox/pandas/pandas/core/generic.py in to_sql(self, name, con, schema, if_exists, index, index_label, chunksize, dtype) 2125 sql.to_sql(self, name, con, schema=schema, if_exists=if_exists, 2126 index=index, index_label=index_label, chunksize=chunksize, -> 2127 dtype=dtype) 2128 2129 def to_pickle(self, path, compression='infer', ~/sandbox/pandas/pandas/io/sql.py in to_sql(frame, name, con, schema, if_exists, index, index_label, chunksize, dtype) 448 pandas_sql.to_sql(frame, name, if_exists=if_exists, index=index, 449 index_label=index_label, schema=schema, --> 450 chunksize=chunksize, dtype=dtype) 451 452 ~/sandbox/pandas/pandas/io/sql.py in to_sql(self, frame, name, if_exists, index, index_label, schema, chunksize, dtype) 1124 if_exists=if_exists, index_label=index_label, 1125 schema=schema, dtype=dtype) -> 1126 table.create() 1127 table.insert(chunksize) 1128 if (not name.isdigit() and not name.islower()): ~/sandbox/pandas/pandas/io/sql.py in create(self) 559 560 def create(self): --> 561 if self.exists(): 562 if self.if_exists == 'fail': 563 raise ValueError("Table '%s' already exists." % self.name) ~/sandbox/pandas/pandas/io/sql.py in exists(self) 547 548 def exists(self): --> 549 return self.pd_sql.has_table(self.name, self.schema) 550 551 def sql_schema(self): ~/sandbox/pandas/pandas/io/sql.py in has_table(self, name, schema) 1152 self.connectable.dialect.has_table, 1153 name, -> 1154 schema or self.meta.schema, 1155 ) 1156 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/base.py in run_callable(self, callable_, *args, **kwargs) 2042 2043 """ -> 2044 with self.contextual_connect() as conn: 2045 return conn.run_callable(callable_, *args, **kwargs) 2046 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/base.py in contextual_connect(self, close_with_result, **kwargs) 2110 return self._connection_cls( 2111 self, -> 2112 self._wrap_pool_connect(self.pool.connect, None), 2113 close_with_result=close_with_result, 2114 **kwargs) ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 2149 if connection is None: 2150 Connection._handle_dbapi_exception_noconnection( -> 2151 e, dialect, self) 2152 else: 2153 util.reraise(*sys.exc_info()) ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _handle_dbapi_exception_noconnection(cls, e, dialect, engine) 1463 util.raise_from_cause( 1464 sqlalchemy_exception, -> 1465 exc_info 1466 ) 1467 else: ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/util/compat.py in raise_from_cause(exception, exc_info) 201 exc_type, exc_value, exc_tb = exc_info 202 cause = exc_value if exc_value is not exception else None --> 203 reraise(type(exception), exception, tb=exc_tb, cause=cause) 204 205 if py3k: ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/util/compat.py in reraise(tp, value, tb, cause) 184 value.__cause__ = cause 185 if value.__traceback__ is not tb: --> 186 raise value.with_traceback(tb) 187 raise value 188 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/base.py in _wrap_pool_connect(self, fn, connection) 2145 dialect = self.dialect 2146 try: -> 2147 return fn() 2148 except dialect.dbapi.Error as e: 2149 if connection is None: ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in connect(self) 385 """ 386 if not self._use_threadlocal: --> 387 return _ConnectionFairy._checkout(self) 388 389 try: ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in _checkout(cls, pool, threadconns, fairy) 764 def _checkout(cls, pool, threadconns=None, fairy=None): 765 if not fairy: --> 766 fairy = _ConnectionRecord.checkout(pool) 767 768 fairy._pool = pool ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in checkout(cls, pool) 514 @classmethod 515 def checkout(cls, pool): --> 516 rec = pool._do_get() 517 try: 518 dbapi_connection = rec.get_connection() ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in _do_get(self) 1227 1228 def _do_get(self): -> 1229 return self._create_connection() 1230 1231 def recreate(self): ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in _create_connection(self) 331 """Called by subclasses to create a new ConnectionRecord.""" 332 --> 333 return _ConnectionRecord(self) 334 335 def _invalidate(self, connection, exception=None): ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in __init__(self, pool, connect) 459 self.__pool = pool 460 if connect: --> 461 self.__connect(first_connect_check=True) 462 self.finalize_callback = deque() 463 ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/pool.py in __connect(self, first_connect_check) 649 try: 650 self.starttime = time.time() --> 651 connection = pool._invoke_creator(self) 652 pool.logger.debug("Created new connection %r", connection) 653 self.connection = connection ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/strategies.py in connect(connection_record) 103 if connection is not None: 104 return connection --> 105 return dialect.connect(*cargs, **cparams) 106 107 creator = pop_kwarg('creator', connect) ~/Envs/pandas-dev/lib/python3.6/site-packages/sqlalchemy/engine/default.py in connect(self, *cargs, **cparams) 391 392 def connect(self, *cargs, **cparams): --> 393 return self.dbapi.connect(*cargs, **cparams) 394 395 def create_connect_args(self, url): OperationalError: (sqlite3.OperationalError) unable to open database file ```
leezu commented 6 years ago

@TomAugspurger I just reran the example with Python 3.6.5 on Linux and could still reproduce the SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140612946281792 and this is thread id 140612829673216 error. Are you running on a different system?

TomAugspurger commented 6 years ago

Yep, MacOS. Must be platform dependent.

On Wed, Jun 27, 2018 at 11:08 AM, Leonard Lausen notifications@github.com wrote:

@TomAugspurger https://github.com/TomAugspurger I just reran the example with Python 3.6.5 on Linux and could still reproduce the SQLite objects created in a thread can only be used in that same thread.The object was created in thread id 140612946281792 and this is thread id 140612829673216 error. Are you running on a different system?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pandas-dev/pandas/issues/19457#issuecomment-400733434, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHImDSdYSU0HQ9lFHDh9f7MqYgpzdmks5uA63ygaJpZM4Rx9bQ .

leezu commented 6 years ago

Adding an extra / to the url in line 11 allows me to reproduce the issue on OS X. Ie. url = f"sqlite:///{f}" instead of url = f"sqlite://{f}"

TomAugspurger commented 6 years ago

Nice catch, confirmed that it raises for me with the same error you get.

On Wed, Jun 27, 2018 at 2:13 PM, Leonard Lausen notifications@github.com wrote:

Adding an extra / to the url in line 11 should allow to reproduce the issue on OS X. Ie. url = f"sqlite:///{f}" instead of url = f"sqlite://{f}"

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pandas-dev/pandas/issues/19457#issuecomment-400797405, or mute the thread https://github.com/notifications/unsubscribe-auth/ABQHIrbMy9JDIkByUwPdDIT_BCdXd7UBks5uA9legaJpZM4Rx9bQ .

nimasasa commented 6 years ago

I am trying to pass the connection and use the wrapper, but with imap_unordered my kernel dies immediately. With pool.map it does not read in parallel. Here is how i am doing it:

import multiprocessing as mp import attr @attr.s(auto_attribs=True) class PDSQLQueryWrapper(object): """Wrap the iterator.

To create the db engine in the thread that calls the iterator first.

"""

_read_sql_query_iterator = None

query = $my_query$
connection = $my_conn$
chunksize= CHUNKSIZE

def next(self):
    if self._read_sql_query_iterator is None:
        self._read_sql_query_iterator = pd.read_sql_query(
            self.query, self.connection, chunksize=self.chunksize)

    return next(self._read_sql_query_iterator)

########################## read_sql = PDSQLQueryWrapper() dfs=[] def process_chunk(chunk): return chunk

pool = mp.Pool() for vals in pool.map(process_chunk,read_sql): dfs.append(vals)

Is there anyway I can fix this?

lapcchan commented 1 year ago
pd.read_sql(query, "sqlite:///sqlite.db?check_same_thread=False",chunksize=10000)