nakagami / pyfirebirdsql

Python DBAPI module for FirebirdSQL
BSD 2-Clause "Simplified" License
71 stars 29 forks source link

DB crash on Memory Full #111

Open fmmontieng opened 1 month ago

fmmontieng commented 1 month ago

Hi, sorry to bother you, but I've encountered an issue after starting to use the library (thanks a lot for your work!).

I am currently using Firebird 2.5 (I know it's old, but I can't change it at the moment).

I have a simple class that collects data into a list, and every 30 seconds, it uploads all the data to the database. This is the only program that causes my database to crash, but I can't figure out why.

The error states that it can't allocate memory, and the database ends up consuming a lot of RAM on the PC. This happens after about 24 hours of running the program, and the issue goes away as soon as I close the script.

This is my code:

class FirebirdConnection:
    def __init__(self, config):
        '''
            Initialize the FirebirdConnection object with the database configuration.

            Args:
                config (dict): A dictionary containing the database configuration parameters:
                    - DB_URL: The hostname or IP address of the Firebird server.
                    - DB_NAME: The name of the Firebird database.
                    - DB_PORT: The port number to connect to (default is 3050 for Firebird).
                    - DB_USER: The username for authentication.
                    - DB_PASS: The password for authentication.
        '''
        self.DB_URL = config['DB_URL']
        self.DB_NAME = config['DB_NAME']
        self.DB_PORT = config['DB_PORT']
        self.DB_USER = config['DB_USER']
        self.DB_PASS = config['DB_PASS']

        self.client = None
        # A list to store data that needs to be inserted into the database
        self.data_to_store = []

        self.lock = asyncio.Lock()
        self.stop = False

    async def connect(self):
        '''
            Attempt to create a new connection
        ''' 
        try:
            logger.info("Attempting to connect to the Firebird database...")
            self.client = firebirdsql.connect(
                host=self.DB_URL,
                database=self.DB_NAME,
                port=self.DB_PORT,
                user=self.DB_USER,
                password=self.DB_PASS
            )
            logger.info("Successfully connected to the Firebird database.")
            return True
        except Exception as e:
            logger.error(f"Failed to connect to the Firebird database: {e}")
            return False

    async def disconnect(self,):
        '''
            Close the connection

        '''
        self.client.close()

    async def healthy(self):
        """
        Checks if a Firebird SQL connection is healthy.

        """
        async with self.client.cursor() as cur:
            try:
                # Open a cursor and execute a simple query 
                cur.execute('SELECT 1 FROM RDB$DATABASE')
                cur.fetchone()
                return True

            except (firebirdsql.OperationalError, firebirdsql.DatabaseError) as e:
                print(f"Connection error: {e}")
                return False
            except Exception as e:
                print(f"An unexpected error occurred: {e}")
                return False 

    async def run(self,):
        """
            Continuously checks the database connection and stores data to the database.
            If the connection is lost, it attempts to reconnect.
        """
        # Check if the client connection is not initialized or closed
        while not self.stop:
            if self.client and await self.healthy():
                # Attempt to store data to the database if the connection is active
                await self.store_data_to_db()
                await asyncio.sleep(30)
            else:
                logger.warning('Waiting connection')
                await self.connect()

    async def store_data_to_db(self,):
        """
            Store data to the database using an asynchronous method.
            Ensures that no data is added to `self.data_to_store` while the operation is running.
        """
        # Acquire the lock to ensure exclusive access to self.data_to_store
        async with self.lock:
            # Check if there is any data to store
            if self.data_to_store == []:
                logger.info('No data to store')
                return

            query = "INSERT INTO SENSORS_LOG (ID, STIME, sVALUE) VALUES (?, ?, ?)"

            async with self.client.cursor() as cur:
                try:
                    # Use executemany to insert multiple rows efficiently
                    cur.executemany(query, self.data_to_store)
                    # Commit the transaction to save changes
                    self.client.commit()
                    logger.info(f"Inserted {len(self.data_to_store)} rows into SENSORS_LOG.")

                    # Clear the list after successful insertion
                    self.data_to_store.clear()

                except Exception as e:
                    # Log an error if the insertion fails
                    logger.error(f"Failed to insert sensor logs: {e}")

    async def register_data(self, data):
        """
            Safely add a tuple of data to the data_to_store list.

            Args:
                data (tuple): A tuple containing sensor data to be added to the list. 
                    Should be in the format (sensor_id, timestamp, value).
        """
        assert len(data)==3

        await self.lock.acquire()
        try:
            self.data_to_store.append(data)
            logger.info(f"Added data {data} to data_to_store.")
        finally:
            self.lock.release()    

    async def get_id(self, gname, sname):
        """
            Retrieve the ID of a sensor based on its group name and sensor name.

            Args:
                gname (str): The group name of the sensor.
                sname (str): The name of the sensor.

            Returns:
                int: The ID of the sensor if found, -1 if multiple entries are found, 0 if no entry is found.
        """
        # Use a cursor to interact with the database
        async with self.client.cursor() as cur:
            try:
                # Execute a query to find the sensor by group name and sensor name
                cur.execute("SELECT ID FROM SENSORS_HEAD WHERE SGROUP = ? AND SNAME = ?", (gname, sname))
                res = cur.fetchall()
            except Exception as e:
                # Log an error if the database request fails
                logger.error(f"DB request failed: {e}")
                return -1
            finally:
                # Determine the result based on the number of entries found
                if len(res) > 1:
                    logger.error(f"Multiple entries found for the sensor: {gname}-{sname}.")
                    ret = -1
                elif len(res) == 1:
                    ret = res[0][0]
                else:
                    ret = 0

            return ret

    async def add_sensor(self, gname, sname, dscr='', unit=''):
        """
            Add a new sensor to the database.

            Args:
                gname (str): The group name of the sensor.
                sname (str): The name of the sensor.
                dscr (str): A description of the sensor (optional).
                unit (str): The unit of the sensor (optional).
        """
        # Use a cursor to interact with the database
        async with self.client.cursor() as cur:
            try:
                # Execute an INSERT statement to add a new sensor to the database
                cur.execute(
                    "INSERT INTO SENSORS_HEAD (SGROUP, SNAME, DSCR, SUNIT) VALUES (?, ?, ?, ?)",
                    (gname, sname, dscr, unit)
                )
                # Commit the transaction to save changes to the database
                self.client.commit()
            except Exception as e:
                # Log an error if the insertion fails
                logger.error(f"Failed to add sensor {gname}-{sname}: {e}")

        id = self.get_id(gname, sname)
        assert id > 0
        return id

Maybe I just made some big mistake and I'm missing something. Thanks for the help.

Matteo

nakagami commented 1 month ago

Is it the server that crashes? is_autocommit=True might change something.

fmmontieng commented 1 month ago

Is it the server that crashes? is_autocommit=True might change something.

Yes, it's the server. I will try it.

fmmontieng commented 1 month ago

@nakagami Unfortunately this hasn't solved the problem.. still crashing.

The other thing that I observed is that as soon as I close the script RAM occupied by the SQL server drop drastically.

nakagami commented 1 month ago

If you want to send the data to the sensor every 30 s, shouldn't you just connect, insert, and close each time?

fmmontieng commented 1 month ago

If you want to send the data to the sensor every 30 s, shouldn't you just connect, insert, and close each time?

That's what I have done, I think is the best solution. Looks like it work properly like this..