peter-wangxu / persist-queue

A thread-safe disk based persistent queue in Python
BSD 3-Clause "New" or "Revised" License
335 stars 50 forks source link

File is not getting cleared after get() function #176

Closed dark-himaro closed 2 years ago

dark-himaro commented 3 years ago

I entered data to file in file-based queue. But after performing get() the elements from the queue is deleted but if we open the file we can still see the data. So i think the file size will keep on increasing if we keep on increasing data.

AntonMelentiev commented 2 years ago

Got the same issue. And @dark-himaro was right. File size keeps increasing.

After adding an element to the queue with put() and getting the last element with get(), qsize() will show that the number of elements in the queue remains the same, but the size of the queue file has increased.

peter-wangxu commented 2 years ago

can you please show some code snippet and version?

AntonMelentiev commented 2 years ago

Shure. I try to get some buffer of video stream frames. For reading streams you need to install opencv-python Next example shows basic realization and has debug prints which show that qsize() will be cleaned but disk size keeps growing.

import os
import random
from contextlib import suppress
from pathlib import Path

import cv2
from persistqueue import Queue

def get_folder_size(folder_path: str):
    size = 0

    for path, folders, files in os.walk(folder_path):
        for folder in folders:
            size += get_folder_size(folder_path=os.path.join(folder_path, folder))

        for file in files:
            with suppress(FileNotFoundError):
                size += os.path.getsize(os.path.join(folder_path, file))

    return size

def check(stream: str, folder_name: str):
    frame_buffer = 10
    queue_folder = str(Path(Path(__file__).parent, "queues", folder_name))
    queue = Queue(queue_folder)
    capture = cv2.VideoCapture(stream)

    while capture.isOpened():
        ret, frame = capture.read()

        if ret:
            queue.put(frame)

            print(f"Queue {folder_name} size: {queue.qsize()}")
            print(f"Folder {folder_name} size: {get_folder_size(queue_folder)} bytes")

            # Here instead of random.choice((1, 0)) I have some condition which sometime become False
            # and queue.qsize() can be much bigger than frame_buffer.
            # Just changed to random choice to get queue overfilled time to time
            if random.choice((1, 0)):
                while queue.qsize() > frame_buffer:
                    queue.get()

            cv2.imshow("Frame", frame)

            # Press Q on keyboard to exit
            if cv2.waitKey(25) & 0xFF == ord("q"):
                break

    capture.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    from multiprocessing import Process

    # stream urls got from https://github.com/grigory-lobkov/rtsp-camera-view/issues/3
    streams = [
        "http://210.148.114.53/-wvhttp-01-/GetOneShot?image_size=640x480&frame_count=1000000000",
        "http://158.58.130.148:80/mjpg/video.mjpg",
    ]

    for stream_id, stream in enumerate(streams):
        process = Process(target=check, args=(stream, str(stream_id)))
        process.start()
AntonMelentiev commented 2 years ago

Foget to mention the version I use: persist-queue = "^0.7.0"

peter-wangxu commented 2 years ago

all you need is to using queue.task_done to persist you state(which in your case, remove the get data)

AntonMelentiev commented 2 years ago

@peter-wangxu, can you please specify in which line in the example it will be correct to use queue.task_done()? Should it be used both after queue.put and queue.get or instead of queue.get?

peter-wangxu commented 2 years ago

only for get()

this is covered by the readme

AntonMelentiev commented 2 years ago

Unfortunately, it does not help. I've added queue.task_done() right after queue.get() in the example above. Disk space of the queue still continuously growing.

I also tried to add queue.task_done() at the end of while cycle to complete task at the end of each loop. It also doesn't help.

peter-wangxu commented 2 years ago

looks like you are using multi process,try with multi thread.

queue is thread safe not process safe

AntonMelentiev commented 2 years ago

Let's kick-off process and thread at all and use queue.task_done(). With the next example, the problem is still here.

import os
import random
from contextlib import suppress
from pathlib import Path

import cv2
from persistqueue import Queue

def get_folder_size(folder_path: str):
    size = 0

    for path, folders, files in os.walk(folder_path):
        for folder in folders:
            size += get_folder_size(folder_path=os.path.join(folder_path, folder))

        for file in files:
            with suppress(FileNotFoundError):
                size += os.path.getsize(os.path.join(folder_path, file))

    return size

def check(stream: str, folder_name: str):
    frame_buffer = 10
    queue_folder = str(Path(Path(__file__).parent, "queues", folder_name))
    queue = Queue(queue_folder)
    capture = cv2.VideoCapture(stream)

    while capture.isOpened():
        ret, frame = capture.read()

        if ret:
            queue.put(frame)

            print(f"Queue {folder_name} size: {queue.qsize()}")
            print(f"Folder {folder_name} size: {get_folder_size(queue_folder)} bytes")

            # Here instead of random.choice((1, 0)) I have some condition which sometime become False
            # and queue.qsize() can be much bigger than frame_buffer.
            # Just changed to random choice to get queue overfilled time to time
            if random.choice((1, 0)):
                while queue.qsize() > frame_buffer:
                    queue.get()
                    queue.task_done()

            cv2.imshow("Frame", frame)

            # Press Q on keyboard to exit
            if cv2.waitKey(25) & 0xFF == ord("q"):
                break

    capture.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
   check("http://210.148.114.53/-wvhttp-01-/GetOneShot?image_size=640x480&frame_count=1000000000", "11")
InzamamAnwar commented 2 years ago

I can confirm that the size of the file is not shrinking after geting the elements from the queue. Below are two simple snippets which can be used to verify

Populate the Queue

import persistqueue

queue = persistqueue.FIFOSQLiteQueue(path="queues", multithreading=False)

for i in range(10000):
    queue.put({
        "id": i,
        "string": '''
José Ramos-Horta (1976)
Ramos-Horta was actively involved in the development of political awareness in Portuguese Timor, which caused him to be exiled for two years in 1970–71 to Portuguese East Africa. His grandfather, before him, had also been exiled, from Portugal to the Azores Islands, then Cape Verde, Portuguese Guinea and finally to Portuguese Timor.

A moderate in the emerging Timorese nationalist leadership, Ramos-Horta was appointed Foreign Minister in the "Democratic Republic of East Timor" government proclaimed by the pro-independence parties in November 1975. When appointed minister, Ramos-Horta was only 25 years old. Three days before the Indonesian troops invaded, Ramos-Horta left East Timor to plead the Timorese case before the UN.

Ramos-Horta arrived in New York to address the UN Security Council and urge them to take action in the face of the Indonesian occupation during which an estimated 102,000 East Timorese would die.[7] Ramos-Horta was the Permanent Representative of Fretilin to the UN for the next ten years. His friends at that time mentioned that he arrived in the United States with a total of $25 in his pocket. His pecuniary situation was often straitened in that period. He survived partly by the grace of Americans who admired his politics and his determination. Furthermore, he was obliged to travel worldwide to explain his party's position.

In 1993, the Rafto Prize was awarded to the people of East Timor. Foreign-minister-in-exile Ramos-Horta represented his nation at the prize ceremony. In May 1994, Philippine President Fidel Ramos (no relation), bowing to pressure from Jakarta, tried to ban an international conference on East Timor in Manila and blacklisted Ramos-Horta, with the Thai government following suit later that year by declaring him persona non grata.[8]

In December 1996, Ramos-Horta shared the Nobel Peace Prize with fellow Timorese Bishop Ximenes Belo. The Nobel Committee chose to honour the two laureates for their "sustained efforts to hinder the oppression of a small people", hoping that "this award will spur efforts to find a diplomatic solution to the conflict of East Timor based on the people's right to self-determination". The Committee considered Ramos-Horta "the leading international spokesman for East Timor's cause since 1975".[9]

Ramos-Horta played a leading role in negotiating the institutional foundations for independence. He led the Timorese delegation at an important joint workshop with UNTAET on 1 March 2000 to tease out a new strategy, and identify institutional needs. The outcome was an agreed blueprint for a joint administration with executive powers, including leaders of the National Congress for Timorese Reconstruction (CNRT). Further details were worked out in a conference in May 2000. The Special Representative of the UN Secretary-General in East Timor, Sérgio Vieira de Mello, presented the new blueprint to a donor conference in Lisbon,[10] on 22 June 2000, and to the UN Security Council on 27 June 2000.[11] On 12 July 2000, the NCC adopted a regulation establishing a Transitional Cabinet composed of four East Timorese and four UNTAET representatives.[12] The revamped joint administration successfully laid the institutional foundations for independence, and on 27 September 2002, East Timor joined the United Nations. Ramos-Horta was its first Foreign Minister.'''
    })

print(queue.qsize())

The above code block will result in a file with ~40MB.

Depopulate Queue

import persistqueue

queue = persistqueue.FIFOSQLiteQueue(path="queues", multithreading=False)

while queue.qsize() > 1000:
    obj = queue.get()
    queue.task_done()

print(queue.qsize())

By poping out 90% of data should have reduced the file size by same 90% rather it stays same.

peter-wangxu commented 2 years ago

I can confirm that the size of the file is not shrinking after geting the elements from the queue. Below are two simple snippets which can be used to verify

Populate the Queue

import persistqueue

queue = persistqueue.FIFOSQLiteQueue(path="queues", multithreading=False)

for i in range(10000):
    queue.put({
        "id": i,
        "string": '''
José Ramos-Horta (1976)
Ramos-Horta was actively involved in the development of political awareness in Portuguese Timor, which caused him to be exiled for two years in 1970–71 to Portuguese East Africa. His grandfather, before him, had also been exiled, from Portugal to the Azores Islands, then Cape Verde, Portuguese Guinea and finally to Portuguese Timor.

A moderate in the emerging Timorese nationalist leadership, Ramos-Horta was appointed Foreign Minister in the "Democratic Republic of East Timor" government proclaimed by the pro-independence parties in November 1975. When appointed minister, Ramos-Horta was only 25 years old. Three days before the Indonesian troops invaded, Ramos-Horta left East Timor to plead the Timorese case before the UN.

Ramos-Horta arrived in New York to address the UN Security Council and urge them to take action in the face of the Indonesian occupation during which an estimated 102,000 East Timorese would die.[7] Ramos-Horta was the Permanent Representative of Fretilin to the UN for the next ten years. His friends at that time mentioned that he arrived in the United States with a total of $25 in his pocket. His pecuniary situation was often straitened in that period. He survived partly by the grace of Americans who admired his politics and his determination. Furthermore, he was obliged to travel worldwide to explain his party's position.

In 1993, the Rafto Prize was awarded to the people of East Timor. Foreign-minister-in-exile Ramos-Horta represented his nation at the prize ceremony. In May 1994, Philippine President Fidel Ramos (no relation), bowing to pressure from Jakarta, tried to ban an international conference on East Timor in Manila and blacklisted Ramos-Horta, with the Thai government following suit later that year by declaring him persona non grata.[8]

In December 1996, Ramos-Horta shared the Nobel Peace Prize with fellow Timorese Bishop Ximenes Belo. The Nobel Committee chose to honour the two laureates for their "sustained efforts to hinder the oppression of a small people", hoping that "this award will spur efforts to find a diplomatic solution to the conflict of East Timor based on the people's right to self-determination". The Committee considered Ramos-Horta "the leading international spokesman for East Timor's cause since 1975".[9]

Ramos-Horta played a leading role in negotiating the institutional foundations for independence. He led the Timorese delegation at an important joint workshop with UNTAET on 1 March 2000 to tease out a new strategy, and identify institutional needs. The outcome was an agreed blueprint for a joint administration with executive powers, including leaders of the National Congress for Timorese Reconstruction (CNRT). Further details were worked out in a conference in May 2000. The Special Representative of the UN Secretary-General in East Timor, Sérgio Vieira de Mello, presented the new blueprint to a donor conference in Lisbon,[10] on 22 June 2000, and to the UN Security Council on 27 June 2000.[11] On 12 July 2000, the NCC adopted a regulation establishing a Transitional Cabinet composed of four East Timorese and four UNTAET representatives.[12] The revamped joint administration successfully laid the institutional foundations for independence, and on 27 September 2002, East Timor joined the United Nations. Ramos-Horta was its first Foreign Minister.'''
    })

print(queue.qsize())

The above code block will result in a file with ~40MB.

Depopulate Queue

import persistqueue

queue = persistqueue.FIFOSQLiteQueue(path="queues", multithreading=False)

while queue.qsize() > 1000:
    obj = queue.get()
    queue.task_done()

print(queue.qsize())

By poping out 90% of data should have reduced the file size by same 90% rather it stays same.

This issue is related to sqlite3 base queue, i will submit a small fix

peter-wangxu commented 2 years ago

I can confirm that the size of the file is not shrinking after geting the elements from the queue. Below are two simple snippets which can be used to verify

Populate the Queue

import persistqueue

queue = persistqueue.FIFOSQLiteQueue(path="queues", multithreading=False)

for i in range(10000):
    queue.put({
        "id": i,
        "string": '''
José Ramos-Horta (1976)
Ramos-Horta was actively involved in the development of political awareness in Portuguese Timor, which caused him to be exiled for two years in 1970–71 to Portuguese East Africa. His grandfather, before him, had also been exiled, from Portugal to the Azores Islands, then Cape Verde, Portuguese Guinea and finally to Portuguese Timor.

A moderate in the emerging Timorese nationalist leadership, Ramos-Horta was appointed Foreign Minister in the "Democratic Republic of East Timor" government proclaimed by the pro-independence parties in November 1975. When appointed minister, Ramos-Horta was only 25 years old. Three days before the Indonesian troops invaded, Ramos-Horta left East Timor to plead the Timorese case before the UN.

Ramos-Horta arrived in New York to address the UN Security Council and urge them to take action in the face of the Indonesian occupation during which an estimated 102,000 East Timorese would die.[7] Ramos-Horta was the Permanent Representative of Fretilin to the UN for the next ten years. His friends at that time mentioned that he arrived in the United States with a total of $25 in his pocket. His pecuniary situation was often straitened in that period. He survived partly by the grace of Americans who admired his politics and his determination. Furthermore, he was obliged to travel worldwide to explain his party's position.

In 1993, the Rafto Prize was awarded to the people of East Timor. Foreign-minister-in-exile Ramos-Horta represented his nation at the prize ceremony. In May 1994, Philippine President Fidel Ramos (no relation), bowing to pressure from Jakarta, tried to ban an international conference on East Timor in Manila and blacklisted Ramos-Horta, with the Thai government following suit later that year by declaring him persona non grata.[8]

In December 1996, Ramos-Horta shared the Nobel Peace Prize with fellow Timorese Bishop Ximenes Belo. The Nobel Committee chose to honour the two laureates for their "sustained efforts to hinder the oppression of a small people", hoping that "this award will spur efforts to find a diplomatic solution to the conflict of East Timor based on the people's right to self-determination". The Committee considered Ramos-Horta "the leading international spokesman for East Timor's cause since 1975".[9]

Ramos-Horta played a leading role in negotiating the institutional foundations for independence. He led the Timorese delegation at an important joint workshop with UNTAET on 1 March 2000 to tease out a new strategy, and identify institutional needs. The outcome was an agreed blueprint for a joint administration with executive powers, including leaders of the National Congress for Timorese Reconstruction (CNRT). Further details were worked out in a conference in May 2000. The Special Representative of the UN Secretary-General in East Timor, Sérgio Vieira de Mello, presented the new blueprint to a donor conference in Lisbon,[10] on 22 June 2000, and to the UN Security Council on 27 June 2000.[11] On 12 July 2000, the NCC adopted a regulation establishing a Transitional Cabinet composed of four East Timorese and four UNTAET representatives.[12] The revamped joint administration successfully laid the institutional foundations for independence, and on 27 September 2002, East Timor joined the United Nations. Ramos-Horta was its first Foreign Minister.'''
    })

print(queue.qsize())

The above code block will result in a file with ~40MB.

Depopulate Queue

import persistqueue

queue = persistqueue.FIFOSQLiteQueue(path="queues", multithreading=False)

while queue.qsize() > 1000:
    obj = queue.get()
    queue.task_done()

print(queue.qsize())

By poping out 90% of data should have reduced the file size by same 90% rather it stays same.

I did not able to reproduce your issue, here is my code, you can run local.

create.py

import persistqueue

queue = persistqueue.Queue(path="queues")

for i in range(10000):
    queue.put({
        "id": i,
        "string": '''
José Ramos-Horta (1976)
Ramos-Horta was actively involved in the development of political awareness in Portuguese Timor, which caused him to be exiled for two years in 1970–71 to Portuguese East Africa. His grandfather, before him, had also been exiled, from Portugal to the Azores Islands, then Cape Verde, Portuguese Guinea and finally to Portuguese Timor.

A moderate in the emerging Timorese nationalist leadership, Ramos-Horta was appointed Foreign Minister in the "Democratic Republic of East Timor" government proclaimed by the pro-independence parties in November 1975. When appointed minister, Ramos-Horta was only 25 years old. Three days before the Indonesian troops invaded, Ramos-Horta left East Timor to plead the Timorese case before the UN.

Ramos-Horta arrived in New York to address the UN Security Council and urge them to take action in the face of the Indonesian occupation during which an estimated 102,000 East Timorese would die.[7] Ramos-Horta was the Permanent Representative of Fretilin to the UN for the next ten years. His friends at that time mentioned that he arrived in the United States with a total of $25 in his pocket. His pecuniary situation was often straitened in that period. He survived partly by the grace of Americans who admired his politics and his determination. Furthermore, he was obliged to travel worldwide to explain his party's position.

In 1993, the Rafto Prize was awarded to the people of East Timor. Foreign-minister-in-exile Ramos-Horta represented his nation at the prize ceremony. In May 1994, Philippine President Fidel Ramos (no relation), bowing to pressure from Jakarta, tried to ban an international conference on East Timor in Manila and blacklisted Ramos-Horta, with the Thai government following suit later that year by declaring him persona non grata.[8]

In December 1996, Ramos-Horta shared the Nobel Peace Prize with fellow Timorese Bishop Ximenes Belo. The Nobel Committee chose to honour the two laureates for their "sustained efforts to hinder the oppression of a small people", hoping that "this award will spur efforts to find a diplomatic solution to the conflict of East Timor based on the people's right to self-determination". The Committee considered Ramos-Horta "the leading international spokesman for East Timor's cause since 1975".[9]

Ramos-Horta played a leading role in negotiating the institutional foundations for independence. He led the Timorese delegation at an important joint workshop with UNTAET on 1 March 2000 to tease out a new strategy, and identify institutional needs. The outcome was an agreed blueprint for a joint administration with executive powers, including leaders of the National Congress for Timorese Reconstruction (CNRT). Further details were worked out in a conference in May 2000. The Special Representative of the UN Secretary-General in East Timor, Sérgio Vieira de Mello, presented the new blueprint to a donor conference in Lisbon,[10] on 22 June 2000, and to the UN Security Council on 27 June 2000.[11] On 12 July 2000, the NCC adopted a regulation establishing a Transitional Cabinet composed of four East Timorese and four UNTAET representatives.[12] The revamped joint administration successfully laid the institutional foundations for independence, and on 27 September 2002, East Timor joined the United Nations. Ramos-Horta was its first Foreign Minister.'''
    })

print(queue.qsize())

delete.py

import persistqueue

queue = persistqueue.Queue(path="queues")

while queue.qsize() > 1000:
    obj = queue.get()
    queue.task_done()

print(queue.qsize())
python3 create.py
# file size is large
du -sh *
>  33M  queues

python3 delete.py

du -sh *
> 3.6M  queues