psycopg / psycopg2

PostgreSQL database adapter for the Python programming language
https://www.psycopg.org/
Other
3.31k stars 503 forks source link

Fetchone() fails, then when printed(cursor.query) returns wrong query #1710

Closed GoranSustekJr closed 2 months ago

GoranSustekJr commented 2 months ago

This is a bug tracker

Please complete the following information:

Describe the bug I am using fastAPI as my API. In the request it starts a function from a different file by adding it to a ThreadPoolExecutor so it executes in the background:

from library.db import select, insert
import asyncio
from typing import Set
from fastapi import FastAPI, Response, Request, Header, HTTPException, BackgroundTasks, APIRouter, WebSocket, WebSocketDisconnect
from concurrent.futures import ThreadPoolExecutor

app = FastAPI()

executor = ThreadPoolExecutor()

@app.websocket('/test') 
async def test(websocket: WebSocket):
    await websocket.accept()
    try:
        data = await websocket.receive_text()
        print(data)
        data = json.loads(data)
        stid = data["stid"]
        await add_async_to_db(stid)
        await websocket.close()
    except WebSocketDisconnect:
        print("Client dissconected")

async def add_async_to_db(stid: str):
    try:
        print("TID: ", stid)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(executor, insert.add, stid, processing_videos)
    except Exception as e:
        print(e)

In the a second file are the add querys:

from library.db import cursor

# Cursor
connection, cursor = cursor.connect_v2()
connection.autocommit = False

# Insert spotify track id (stid)
def insert_stid(stid):
    try:
        print("stid: ", stid)
        new = False
        qry_check = "SELECT tid FROM track WHERE stid = %s"
        print("hmmm: ;", stid)
        cursor.execute(qry_check, (stid, ))
        print("executed; ", stid)
        tid = cursor.fetchone()
        print("TID: ", tid)
        if tid == None:
            new = True
            qry = "INSERT INTO track ( stid ) VALUES ( %s ) RETURNING tid"
            cursor.execute(qry, (stid, ))
            tid = cursor.fetchone()
            print("NEW TID: ", tid)
            connection.commit()
        return tid[0], new
    except Exception as e:
        print(e, stid)
        print(cursor.query, stid)
        print(cursor.statusmessage, stid)
        connection.rollback()
        return None

def add(stid: str):
    try:
        add(stid)
    # More functions
    except Exception as e:
        print("Main failed; ", e, stid)

The connection and cursor variables come from a third file:

import psycopg2

def connect_v2():
    connection = psycopg2.connect(
            host = 'ip',
            database = 'db name',
            user = 'db user',
            password = 'db passwd'
            )
    return connection, connection.cursor()

When I send one or two requests in short span of time, it executes nicely, but when I send 3, 4+ requests an error occurs at tid = cursor.fetchone(). The output is: no results to fetch but when I print the query from the cursor I get the following: b'INSERT INTO tid_auid ( tid, auid ) VALUES ( 206, NULL )' which is a completely different query that the one executed. It seems that because of multiple instances of the same function which are being executed at the same time, one of the functions overrides other functions cursor query.

GoranSustekJr commented 2 months ago

Managed to fix it by creating a new instance of connection and cursor for every thread job with passing it down to query function.

dvarrazzo commented 2 months ago

You can't share cursors across threads. You can share connections, but you will serialize the work. The best option is to use a connection pool so you can have some proper concurrency and no connection overhead.

Also, if you use psycopg 3, not only you get a proper connection pool, you can use async connections and not bother with threads.