WebwareForPython / DBUtils

Database connections for multi-threaded environments
MIT License
331 stars 52 forks source link

Lots of errors when using PooledDB together with uWSGI, Flask and pymysql #39

Closed LostInDarkMath closed 2 years ago

LostInDarkMath commented 2 years ago

Hi there! Since I've been using the library, I've noticed a strange error, which unfortunately doesn't occur reproducibly either. I share my observations here, maybe you have an idea.

relevant part of the stack trace

  File "/opt/maddox/./modules/core/user_handler/user_handler.py", line 131, in get_user_by_id
    return self._make_user_from_data_row(data_row=cursor.fetchone())
  File "/opt/maddox/./modules/core/user_handler/user_handler.py", line 509, in _make_user_from_data_row
    user_id = data_row['user_id']
KeyError: 'user_id'

relevant code section

    def get_user_by_id(self, user_id: str, cursor: DictCursor) -> User:
        result_count = cursor.execute(
            query=Sql.GET_USER_BY_ID,
            args=(user_id,),
        )
        if result_count != 1:
            raise MaddoxException(msg=f'User with id "{user_id}" not found.')

        return self._make_user_from_data_row(data_row=cursor.fetchone())

    def _make_user_from_data_row(self, data_row: Dict[str, Any]) -> User:
        user_id = data_row['user_id']  # KeyError is raised here
        role = Role(data_row['role_id'])
        # lots of other stuff

Since _make_user_from_data_row was called like in the stacktrace above, it follows that result_count must have been equal to 1. But this is a contradiction to the KeyError which means, that are no rows to fetch. Do you have an idea what could go wrong here?

Cito commented 2 years ago

Which DB adapter are you using? I'm asking because in the DB API 2 spec, the cursor.execute() method has no defined return value. In order to get the number of results, you would use cursor.rowcount instead.

LostInDarkMath commented 2 years ago

Hi @Cito, thanks for your fast reply! I'm using the pymysql package.

Cito commented 2 years ago

Ok, pymysql seems to return the number of rows on execute() and executemany(). If you want to write portable code, you should better check for cursor.rowcount, but DBUtils should pass the return value anyway, so it should also work like that.

Also, if there is no result (i.e. result_count == 0), then fetchone() should actually return None, and you should get a TypeError instead of a KeyError. So I think the problem happens when pymysql converts the row to a dict since it seems you have set a DictCursor class for the connection. You can do something like this to debug:

try:
  user_id = data_row['user_id'] 
except KeyError:
  print("user_id missing in data_row:", data_row)  # log the data row
  raise

Is your Sql.GET_USER_BY_ID constant? I noticed that the DictCursor of pymysql sometimes adds the tablename to the keys, maybe that happens for some of your queries?

LostInDarkMath commented 2 years ago

Hi @Cito, yes, I expected that the SteadyDBCursor from dbutils to work the exact same way as the Cursor and DictCursor from pymysql. And yes, almost all the time this is the case. 1.600+ unit tests are green every time and almost every time it works also in production. But sometimes these strange edge cases appears.

Yes, it must have been something with the dict conversion of the result rows. I implemented the logging and now I'm waiting for the problem to occur once more :)

Yes, Sql.GET_USER_BY_ID is a constant. Since the described error occurs very rarely I think that the queries are not the problem here :)

Cito commented 2 years ago

Sounds like a race condition then.

One explanation could be that the same connection is used in another thread - pymysql is not thread-safe on the connection level. So what can happen is that you execute the query, then another thread reuses the same connection to execute another query, and then when pymysql creates the rowdict for the data_row, it uses the column names of that other query.

Do you use PooledDB? How do you fetch the connection? Is it a dedicated_connection()?

LostInDarkMath commented 2 years ago

Here is an overview of my setup:

import flask
import pymysql
from dbutils.pooled_db import PooledDB
from flask import Flask, g
from pymysql.constants import CLIENT

def setup_app(config, pooled_connections=20) -> Flask:
    flask_app = flask.Flask(__name__, static_folder=None)
    flask_app.config.from_object(obj=config)

    connection_pool = PooledDB(
        mincached=pooled_connections,  # start with that number of cached connections
        maxcached=50,  # maximum number of idle connections in pool
        maxshared=0,  # all connections are dedicated, not shared
        maxconnections=150,  # None = unlimited, use same value as MariaDB
        blocking=True,  # True = wait for conn, False = throw Error if no available conns
        maxusage=None,  # None means unlimited
        setsession=[],  # SQL command for session preparation
        reset=True,  # always do a rollback when a connection is returned to the pool
        ping=1,  # ping connections when fetched from the pool
        client_flag=CLIENT.MULTI_STATEMENTS,
        cursorclass=pymysql.cursors.DictCursor,
        creator=pymysql,  # which library to use to create connections
        host=flask_app.config['MARIADB_HOST'],
        port=flask_app.config['MARIADB_PORT'],
        user=flask_app.config['MARIADB_USER'],
        password=flask_app.config['MARIADB_PASSWORD'],
        db=flask_app.config['MARIADB_DATABASE'],
    )
    flask_app.config['DB_POOL'] = connection_pool

    @flask_app.teardown_appcontext
    def close_db(_) -> None:
        """Closes the database at the end of the request."""
        if hasattr(g, 'connection'):
            g.connection.close()

    return flask_app

How i fetch the connections in the API endpoints:


def get_connection() -> Union[Connection, PooledDedicatedDBConnection]:
    if 'connection' not in g:
        if env_mariadb.use_pooling:
            g.connection = get_pooled_connection()
        else:
            g.connection = get_direct_connection()

    return g.connection

def get_direct_connection() -> Connection:
    # this is the old classic way. This works for years in production systems wihout any problems.
    return create_connection(
        host=current_app.config['MARIADB_HOST'],
        port=current_app.config['MARIADB_PORT'],
        user=current_app.config['MARIADB_USER'],
        password=current_app.config['MARIADB_PASSWORD'],
        database=current_app.config['MARIADB_DATABASE'],
    )

def get_pooled_connection() -> PooledDedicatedDBConnection:
   # this is the new implementation using your library 
    db_pool: PooledDB = current_app.config['DB_POOL']
    return db_pool.dedicated_connection()

Interestingly, the connection pooling does not work correctly at all. I often get errors like Packet sequence number wrong - got 1 expected 0 which indicates that connections are shared between threads. What's wrong here? Is it an issue with uWSGI since I'm using 4 uWSGI workers?

LostInDarkMath commented 2 years ago

I mean, the workers leads to a Flask application that runs in different processes. Is that the root of evil here? What are the best practices using uWSGI together with PooledDB?

Cito commented 2 years ago

I have been using DBUtils only in purely multithreaded environments, not with uWSGI. I assume if you have 4 worker processes, then you will get also 4 connection pools. With which exact configuration params do you start uWSGI?

LostInDarkMath commented 2 years ago

Here is the command from my docker-compose.yaml file that runs uWSGI together with flask:

    command:
      sh
      -c
      'uwsgi
      --wsgi-file /opt/foo/services/core/api_gateway/run.py
      --callable app
      --http-socket 0.0.0.0:8080
      --master
      --need-app
      --logger "queueFull stdio"
      --logger "uwsgiGeneral file:/dev/null"
      --logger stdio
      --log-route "uwsgiGeneral \[pid:.+\|app:.+\|req:.+\].*\[.+\]\s(POST|GET|PUT|DELETE|HEAD).+"
      --log-route "queueFull .+listen queue.+full.+"
      --workers 4'

It seems like the setup_app() is only called once with means that all workers (= processes) share one single connection pool. That would explain all problems. Here are the logs when I run it:

*** Starting uWSGI 2.0.20 (64bit) on [Wed Feb 16 12:10:44 2022] ***
compiled with version: 10.2.1 20210110 on 14 February 2022 12:06:24
os: Linux-5.13.0-28-generic #31~20.04.1-Ubuntu SMP Wed Jan 19 14:08:10 UTC 2022
nodename: 9f72e0cb2f33
machine: x86_64
clock source: unix
pcre jit disabled
detected number of CPU cores: 8
current working directory: /opt/foo
detected binary path: /opt/foo/.venv/bin/uwsgi
your memory page size is 4096 bytes
detected max file descriptor number: 1048576
lock engine: pthread robust mutexes
thunder lock: disabled (you can enable it with --thunder-lock)
uwsgi socket 0 bound to TCP address 0.0.0.0:8080 fd 6
Python version: 3.8.12 (default, Feb  8 2022, 05:22:14)  [GCC 10.2.1 20210110]
*** Python threads support is disabled. You can enable it with --enable-threads ***
Python main interpreter initialized at 0x56378f4e32f0
your server socket listen backlog is limited to 100 connections
your mercy for graceful operations on workers is 60 seconds
mapped 364600 bytes (356 KB) for 4 cores
*** Operational MODE: preforking ***
2022-02-16 12:10:52.839: INFO     Using Connection Pooling
WSGI app 0 (mountpoint='') ready in 9 seconds on interpreter 0x56378f4e32f0 pid: 6 (default app)
*** uWSGI is running in multiple interpreter mode ***
spawned uWSGI master process (pid: 6)
spawned uWSGI worker 1 (pid: 55, cores: 1)
spawned uWSGI worker 2 (pid: 56, cores: 1)
spawned uWSGI worker 3 (pid: 57, cores: 1)
spawned uWSGI worker 4 (pid: 58, cores: 1)

The log line

2022-02-16 12:10:52.839: INFO     Using Connection Pooling

comes from the setup_app() method.

Cito commented 2 years ago

I think even that should be no problem if it would be guaranteed that one application context corresponds to one process/thread only. But yes, I guess it is caused by one process with one pool being forked. How does the run.py file look like? Is that the script you posted with the setup_app function?

LostInDarkMath commented 2 years ago

Yes, the run.py simply calls the seup_app() function with something like app = setup_app(config=ProductionConfig()) nothing special there.

Yes, I think that could be the problem. But I do not know how uWSGI does the process handling / forking under the hood. That's why I asked if someone has some experience with this special (but common I guess) setup.

Cito commented 2 years ago

Hi @LostInDarkMath. I have looked into this now a bit more.

So what happens is that you create one connection pool, and then uwsgi forks 4 worker processes. All these workers now get a clone of that same pool, including the same pre-created 20 connections. When the first worker then asks for a connection from the pool, the pool hands out the 1st pre-created connection. But the pool of the second worker does not see that this connection is now in use, since it is a clone of the first pool only (not the same pool), so it happily hands out that same connection when asked for a dedicated connection. This causes the errors you are noticing.

One solution would be to not set up the pool with pre-created connections, i.e. to set mincached=0.

A better solution would be to create the pool dynamically with a function like this:

def get_db_pool():
    db_pool = app.config.get('DB_POOL')
    if not db_pool:
        db_pool = PooledDB(...)
        app.config['DB_POOL'] = db_pool
    return db_pool

def get_pooled_connection():
    db_pool = get_pool()
    return db_pool.dedicated_connection()

Then each worker would create its own pool the first time a connection is requested. Of course, the size of the pools should be reduced accordingly (i.e. in this case only 1/4 of the complete size).

And of course, you should also configure uwsgi to use some threads for each worker process, because otherwise it does not make sense to use connection pools. These threads would then share the same connection pool.

LostInDarkMath commented 2 years ago

Thank you very much @Cito! I'll have a look at it later :)

LostInDarkMath commented 2 years ago

Your suggestions work very well for me, thank you @Cito!