rsalmei / alive-progress

A new kind of Progress Bar, with real-time throughput, ETA, and very cool animations!
MIT License
5.53k stars 206 forks source link

Detect and display async queue updates #159

Closed StianOvrevage closed 2 years ago

StianOvrevage commented 2 years ago

I really like how featured alive-progress is.

But I have a problem I'm not able to work around.

I have a queue of tasks, and X workers processing the tasks and want to show a progress bar.

Ideally I would like to have the option of just calling bar() from workers but it doesn't seem alive-progress can work without with alive_bar() as bar.

My current approach:

updateQueue = asyncio.Queue()

# Put stuff in the queue. It starts full and never increases after here.

await monitor_queue(updateQueue)

async def monitor_queue(q: asyncio.Queue):
  previousQueueSize = q.qsize()
  with alive_bar(initialQueueSize, title='Queue progress', monitor=False, stats=False) as bar:
    if q.qsize() == 0:
      print("queue size 0, quitting")
      return
    await asyncio.sleep(1)

    queueChange = previousQueueSize - q.qsize()
    for i in range(queueChange):
      print("Queue change: " + str(queueChange))
      bar()
    previousQueueSize = q.qsize()

But after the first call to bar() it just hangs. The rest of the program continues to run though.

rsalmei commented 2 years ago

Hey @StianOvrevage, I'm not adept in asyncio, but I'll try to help you. First of all, have you seen the example in https://github.com/rsalmei/alive-progress/issues/11?

StianOvrevage commented 2 years ago

I did. But couldn't get it to work. Actually I think I managed to "solve" it by just doing a while True: inside the with ... like:

async def monitor_queue(start, q: asyncio.Queue):
  initialQueueSize = q.qsize()
  previousQueueSize = initialQueueSize
  with alive_bar(initialQueueSize, title='Rollout progress', stats=False) as bar:
    while True:
      if q.qsize() == 0:
        return
      await asyncio.sleep(1)

      queueChange = previousQueueSize - q.qsize()
      for i in range(queueChange):
        bar()
      previousQueueSize = q.qsize()
rsalmei commented 2 years ago

That's awesome, it does seem correct to me. That await asyncio.sleep(1) is the granularity you want to test by how much the queue changed, and you can send this number directly to bar()! Also, I can see some improvements... Something like this:

async def monitor_queue(start, q: asyncio.Queue):
  previousQueueSize = q.qsize()
  with alive_bar(previousQueueSize, title='Rollout progress', stats=False) as bar:
    while True:
      await asyncio.sleep(.2)  # to detect changes faster.

      currentQueueSize = q.qsize()
      queueChange = previousQueueSize - currentQueueSize
      if queueChange:
        bar(queueChange)
      if currentQueueSize == 0:
        return
      previousQueueSize = currentQueueSize

Notice the subtle bug that can happen if you call q.qsize() more than once in the loop: they can return different numbers when some task is finished in the background, so you'd lose that count, and alive-progress would finish with an alert. And you were calling it three times there. You also have to return only after updating the bar.

With this, I think you should be OK! 👍

rsalmei commented 2 years ago

Well, closing this, since it feels resolved. Let me know if anything new arises.