long2ice / asynch

An asyncio ClickHouse Python Driver with native (TCP) interface support.
https://github.com/long2ice/asynch
Apache License 2.0
186 stars 43 forks source link

Connection Pool thread safety, secure connection, SSL, gather? #86

Open devMarlee opened 1 year ago

devMarlee commented 1 year ago

Hello @long2ice and contributors, I am implementing connection pool in my Flask application. I am inserting a lot of files into Clickhouse at one time and wait for another long time. So I create a connection pool in each request and close it when the request is done.

My Flask app is running in Gunicorn with multiple threads.

However, i am getting an error like:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
2023-11-17 08:27:46,675 - ERROR - Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x4036c5c8b0>
transport: <_SelectorSocketTransport closing fd=26>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 910, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/asyncio/sslproto.py", line 685, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 916, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 711, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 723, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

I only get this request from the second request and onwards (not on the first request). I am wondering if the problem could be related to something with thread-safety, SSL connections, or perhaps use of asyncio.gather with the connection pool

My route is like this:

@some_route("/route_name", methods=[POST])
async def insert():
    # some inits here
    list_of_files = [some list of files]
    async with create_pool(minSize=50, maxSize=100, host=host, port=port, user=username, password=pw, secure=True) as pool:
        tasks = []
        for files in list_of_files:
            tasks.append(execute_with_pool(pool, file))
        responses = await asyncio.gather(*tasks)
        response_list.append(responses)

    pool.close()
    await pool.wait_closed()
    return response_list

    async def execute_with_pool(pool, file):
        cmd = # I create some CH command with the filename
        try:
            async with pool.acquire() as conn:
                 async with conn.cursor() as cursor:
                    response = await cursor.execute(cmd)
                 return Response(response)
        except:
               # some exception handling

Is this an incorrect way of using the connection pool?

Using python3.8, linux OS, asynch=0.2.2

ljluestc commented 2 months ago
from flask import Flask, request, jsonify, Response
import asyncio
from asyncpg import create_pool, Pool
from asyncpg.connection import Connection

app = Flask(__name__)

# Initialize the connection pool globally
pool: Pool = None

async def init_pool():
    global pool
    pool = await create_pool(
        min_size=50, max_size=100, 
        host='your_host', port='your_port', 
        user='your_username', password='your_password', 
        ssl=True
    )

@app.before_first_request
async def setup_pool():
    await init_pool()

@app.route('/route_name', methods=['POST'])
async def insert():
    list_of_files = request.json.get('files', [])
    response_list = []

    async def execute_with_pool(file):
        cmd = f"YOUR SQL COMMAND FOR {file}"  # Adjust this to your actual command
        try:
            async with pool.acquire() as conn:
                async with conn.transaction():
                    response = await conn.execute(cmd)
                    return response
        except Exception as e:
            print(f"Exception occurred: {e}")
            return str(e)

    tasks = [execute_with_pool(file) for file in list_of_files]
    responses = await asyncio.gather(*tasks)
    return jsonify(responses)

@app.teardown_appcontext
async def close_pool(exception=None):
    global pool
    if pool:
        await pool.close()

if __name__ == '__main__':
    app.run(threaded=True)  # Gunicorn will handle multi-threading, this may not be necessary
Rama3an commented 2 months ago

Hello!! Did you manage to solve the problem? I just encountered exactly the same thing :) @devMarlee