oracle / python-oracledb

Python driver for Oracle Database conforming to the Python DB API 2.0 specification. This is the renamed, new major release of cx_Oracle
https://oracle.github.io/python-oracledb
Other
328 stars 66 forks source link

invalid dpiStmt handle #143

Closed lime-n closed 1 year ago

lime-n commented 1 year ago

What versions are you using?

oracledb==1.2.1

Give your database version.

Oracle developer instant client 21

platform.platform: Windows-10-10.0.19044-SP0
sys.maxsize > 2**32: True
platform.python_version: 3.9.13
oracledb.__version__: 1.2.1

-->

  1. Is it an error or a hang or a crash? Error

  2. What error(s) or behavior you are seeing?

cx_Oracle.DatabaseError: DPI-1002: invalid dpiStmt handle

username = os.environ.get('USERNAME')
password = os.environ.get('PASSWORD')

if platform.system() == "Windows":
    client = ''
    for path in os.environ.get('PATH').split(';'):
        if 'instantclient_21' in path:
            client = path
    oracledb.init_oracle_client(lib_dir = client)
else:
    oracledb.init_oracle_client()

def start_workload(fn):
    def wrapped(self, threads, *args, **kwargs):
        assert isinstance(threads, int)
        assert threads > 0

        ts = []
        for i in range(threads):
            new_args = (self, i, *args)
            t = threading.Thread(target=fn, args=new_args, kwargs=kwargs)
            t.start()
            ts.append(t)
        for t in ts:
            t.join()
    return wrapped

class threadedEngine(Engine):

    def __init__(self, batchsize, maxrows, *args):
        print('oracle://%s:%s@' % (args))
        self._ENGINE =  create_engine('oracle://%s:%s@localhost:1521/?service_name=my_service_name' % (args), pool_pre_ping = True) 
        self._connection = self._ENGINE.raw_connection()
        self._cursor = self._connection.cursor()
        self._batchsize = batchsize
        self._maxrows = maxrows

    @start_workload    
    def table(self,tn, table = None, query = None, chunksize: bool = False):
        max_rows = self._maxrows
        row_iter = int(max_rows/self._batchsize)
        self._cursor.arraysize = 10000
        self._cursor.prefetchrows = 1000000
        if query is None and table is not None:
            start = time.time()
            self._cursor.execute(f"SELECT /*+ ENABLE_PARALLEL_DML PARALLEL(AUTO) */ * FROM {table} offset :rowoffset rows fetch next :maxrows rows only", rowoffset = (tn*row_iter), maxrows = row_iter)
            columns = [col[0] for col in self._cursor.description]
            self._cursor.rowfactory = lambda *args: dict(zip(columns, args))
            result = pd.DataFrame(self._cursor.fetchall())
            end = time.time()
            print("Total Time: %s" % (end-start))
            return result

I run it with the following:

NUM_THREADS = 12
MAX_ROWS = 1200000

if __name__ == '__main__':
    oracle = threadedEngine(NUM_THREADS,MAX_ROWS,username, password)
    oracle.table(NUM_THREADS, 'LARGE_TABLE')
cjbj commented 1 year ago

When you get this working I would love to see your performance results to see if splitting the data fetch like this is actually beneficial, or whether there is a cost to sorting the data in the DB (vs just doing a full table scan in a single thread), and whether Python threading, or any post-fetch data manipulation costs outweigh any benefits. The cross-over point (if any) of a single connection & query vs parallel fetches would be interesting to know.

You can check my suspicion about connection reuse by querying SELECT SYS_CONTEXT('USERENV','SID') FROM DUAL in each thread. If each thread has the same number then you will need to re-think your plan, and drop down to pure python-oracledb or use the session interface in SQLAlchemy (probably with a connection pool). This is about the 4th question I've seen on the topic of parallel data fetches in the last few days: check one quick example I put together showing multiple connections in https://stackoverflow.com/questions/75351686/read-sql-queries-via-pandas-quickly-with-pyoracle/75419536#75419536 There is a SQLAlchemy 2.0 example of connection pooling at https://github.com/cjbj/python-oracledb-demos-2022/blob/main/6_sqlalchemy_example.py

cjbj commented 1 year ago

After a bit of testing, my thesis may be wrong, but the problem still smells like a threading issue at the app layer.

anthony-tuininga commented 1 year ago

This is a bug in your code. You cannot perform concurrent queries on a single connection. You would have to have multiple connections -- one for each thread -- in order to perform these in parallel. If you had multiple cursors you would at least not run into the problem you are running into -- but it still wouldn't work in parallel!

stale[bot] commented 1 year ago

This issue has been automatically marked as inactive because it has not been updated recently. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] commented 1 year ago

This issue has been automatically closed because it has not been updated for a month.

lime-n commented 10 months ago

So I have modified my script to become more asynchronous, at least in concurrency by speed to receive the query output from the database.

import asyncio
import os
import pandas as pd
import cx_Oracle_async
import logging
import timeit

username = os.environ.get('USERNAME')
password = os.environ.get('PASSWORD')
DB_HOST = "hostname"
DB_PORT = "1521"
DB_SERVICE = "service"

# Configure logging
logging.basicConfig(level=logging.DEBUG)

class ThreadedEngine:
    def __init__(self, batchsize, maxrows, username, password, num_pools=3):
        self._batchsize = batchsize
        self._maxrows = maxrows
        self._username = username
        self._password = password
        self._num_pools = num_pools
        self._pools = []

    async def init_engines(self):
        for _ in range(self._num_pools):
            dsn = cx_Oracle_async.makedsn(DB_HOST, 1521, service_name=DB_SERVICE)
            pool = await cx_Oracle_async.create_pool(
                user=self._username,
                password=self._password,
                dsn=dsn,
                min=1,
                max=10,
                encoding="UTF-8",
                threaded=True,
                events=False
                )
            self._pools.append(pool)

    async def fetch_batch(self, pool, query, offset, maxrows):
        retry_attempts = 3  # Set the number of retry attempts
        current_attempt = 0
        complete_table = False

        while current_attempt < retry_attempts:
            try:
                async with pool.acquire() as conn:
                    async with conn.cursor() as cursor:
                        cursor.arraysize = 100000
                        await cursor.execute(
                            f"{query} OFFSET :rowoffset ROWS FETCH NEXT :maxrows ROWS ONLY",
                            rowoffset=offset, maxrows=maxrows)
                        columns = [col[0] for col in cursor.description]
                        rows = await cursor.fetchmany(maxrows)
                        complete_table = True
                        return pd.DataFrame(rows, columns=columns)
            except Exception as e:
                # Check if the exception message contains ''DPI-1001'
                if 'DPI-1001' in str(e):
                    logging.warning(f"Running out of memory. Retrying the pool ({current_attempt + 1}/{retry_attempts}).")
                    current_attempt += 1
                    await asyncio.sleep(5)  # Add a delay before retrying
                else:
                    logging.error(f"Error fetching batch: {e}")
                    return pd.DataFrame()
            finally:
                offset += maxrows

        if not complete_table:
            logging.error(f"Failed after {retry_attempts} attempts. Could not retrieve batch from pool.")
            return pd.DataFrame()
        else:
            logging.info(f"Successfully retrieved complete table from pool after {current_attempt} retry attempts.")

    async def table(self, tn, table=None, offset=0):
        await self.init_engines()

        total_rows = self._maxrows
        pool_batches = self._batchsize // self._num_pools

        start_time = timeit.default_timer()

        while offset < total_rows:
            tasks = []
            for i, pool in enumerate(self._pools):
                tasks.append(self.fetch_batch(pool, table, offset, pool_batches))

            try:
                results = await asyncio.gather(*tasks)
            except Exception as e:
                logging.error(f"Error during concurrent execution: {e}")
                results = []

            for result in results:
                print(result)

            offset += self._batchsize

        elapsed_time = timeit.default_timer() - start_time
        print(f"Total time elapsed: {elapsed_time} seconds")

# Usage
async def main():
    engine = ThreadedEngine(50000, 1000000, username, password, num_pools=3)
    await engine.table(0, table='select  /*+ PARALLEL(cmp_product, 4, 20) */ * from bi.cmp_product')

if __name__ == "__main__":
    asyncio.run(main())

How can I improve the memory performance relating to DPI-1001.

The above uses cx_Oracle_async which integrates oracledb and builds async methods around it. This is temporary until oracledb enables async features with sqlalchemy, which I will modify after this feature is out to compare speed.

anthony-tuininga commented 10 months ago

I would set the array size lower. There is very little advantage to setting it to 100,000 unless you have very high network latency. A few thousand works just as effectively with considerably less memory usage! Thin mode uses even less memory. :-) As for asyncio support in python-oracledb, I have a working prototype so hopefully that feature will be available fairly soon!