eth-cscs / pyfirecrest

Python wrappers for the FirecREST API
https://pyfirecrest.readthedocs.io
BSD 3-Clause "New" or "Revised" License
10 stars 7 forks source link

AsyncFirecrest: Practicality and Efficiency #94

Closed khsrali closed 5 months ago

khsrali commented 8 months ago

If one uses two consecutive asyncio.run() in the asyncio library, everything should be alright. Because as far as I understood asyncio.run() is supposed to both create & close an event loop. However, AsyncFirecrest for some reasons, prevents a secondary run to create a new loop event, which I think is not an expected behavior. I'm not sure where this problem is originating from. It can be reproduced if you do something like this: --a dummy example, just to demonstrate--

import AsyncFirecrest, asyncio

_client = ... define a client here...

async def get_systems():
        systems = await _client.all_systems()

asyncio.run(get_systems())
asyncio.run(get_systems())

The first run passes successfully, and the second one, which is supposed to create a new event loop, fails to do so. RuntimeError: Event loop is closed

ekouts commented 8 months ago

Hi @khsrali , thanks for reporting this. I had a quick look trying to figure out where this error is coming from and I don't know yet. 🤔 As far as I understand -ideally- you are not meant to call asyncio.run more than once, but it is still a valid question why this fails. Next week I will have a bit more time to investigate this, so we will let you know.

khsrali commented 8 months ago

Hi @ekouts, thanks for following up!

You are right, so instead I tried with this, which I think it should be a valid asyncio script:


_client = AsyncFirecrest(...) # define a client here

def get_systems_twice():

    async def get_systems():
        systems = await _client.all_systems()
        print(systems)

    async def main():
        tasks = [get_systems(), get_systems()]
        await asyncio.gather(*tasks)

    loop = asyncio.new_event_loop()
    loop.run_until_complete(main())
    loop.close() 

get_systems_twice() # runs as expected
get_systems_twice() # raises RuntimeError

Here I explicitly set a new_event_loop, but raises

RuntimeError: <asyncio.locks.Lock object at 0x7fb4aa297390 [locked]> is bound to a different event loop

It seems this is due to the way AsyncClient.py is locking loops.

ekouts commented 8 months ago

Hi @khsrali and sorry for the delayed response. So, l found out where the error was coming from and it's from the fact that I am using the same httpx.AsyncClient() for all the requests of a client.

Based on the original code, here is a small example with the same error:

import asyncio
import httpx

class Test():
    def __init__(self):
        self._session = httpx.AsyncClient()

    async def print_facts(self):
        resp = await self._session.get('https://catfact.ninja/fact')
        print(resp.json())

t = Test()

async def facts():
    await t.print_facts()

asyncio.run(facts())
asyncio.run(facts())

Sorry for that. This was me being a bit lazy and assuming that this may speed some things up without testing. It's probably worth investing some time from our side to make this a bit better/more efficient.

But what are you trying to achieve in your code? There are different ways to fix the issue without changing pyfirecrest. For example

  1. Create one client per async function (in this case facts or get_systems).
    async def facts():
      t = Test()
      await t.print_facts()
  2. Use asyncio.gather to run all the methods you want to test. In this way you can also take advantage of different features of the client, like custom rate limit per microservice, retries in case the CSCS rate limit is reached, merging of requests when possible (for tasks and slurm jobs polling).
khsrali commented 8 months ago

Thank you @ekouts for following up. It's good that you found the issue, I'll look it up.

It's probably worth investing some time from our side to make this a bit better/more efficient.

I think it certainly does. I have some testing regarding efficiency of AsyncFirecrest which I can share with you. In short, there are slight changes (perhaps also on locking policy but I'm not sure) that if you make, things become faster at least by a factor of 2.

But what are you trying to achieve in your code?

Basically, I'm trying to make a function that could be called multiple times, to download different bunch of files at each call. One can use the work around you provided, but still, if one needs to do two different operation withing let's say facts() such as get_systems and external_download it fails. Just saying it's tricky this way to really make practical use of AsyncFirecrest . And this is a pitty, because everything is sort of developed in AsyncFirecrest just needs a bit of enhancement.

Unfortunately eiger is down now, and I have to find an alternative machine to final check my tests, the I'll share it with you.

ekouts commented 8 months ago

I have some testing regarding efficiency of AsyncFirecrest which I can share with you.

Thank you, I would be very interested in this!

Regarding the httpx.AsyncClient I think it is worth it to keep it like this since I found this in the [docs](from https://www.python-httpx.org/async/), but I will try to get some benchmarks at some point to confirm this.

In order to get the most benefit from connection pooling, make sure you're not instantiating multiple client instances - for example by using async with inside a "hot loop". This can be achieved either by having a single scoped client that's passed throughout wherever it's needed, or by having a single global client instance.

And finally on this:

Basically, I'm trying to make a function that could be called multiple times, to download different bunch of files at each call. One can use the work around you provided, but still, if one needs to do two different operation withing let's say facts() such as get_systems and external_download it fails. Just saying it's tricky this way to really make practical use of AsyncFirecrest.

How I imagined the AsyncFirecrest to be useful was to create different "workflows", async functions with many awaited functions inside and then in your main you would gather all these workflows. Something like this example. I think something like this would cover your use case but maybe I am not understanding it well enough.

I could also add some function in the AsyncFirecrest class that would close the connection and re-open it so you would avoid creating new objects, but I still think a solution with await and gather would be more efficient. We can discuss it more today in the call, if we have time, or on another call if you would like.

khsrali commented 7 months ago

Hi @ekouts , @jpdorsch, --P.S. unfortunately I don't have Elia's github name to pin here, please do if you have--

In today's meeting, I mentioned a simple test I conducted, and I'd like to share the results with you. AsyncFirecrest seems to perform about 10 times slower than Firecrest. For this test, I submitted requests for external_download of 15 dummy files, each being 1MB in size. I thought this information might be worth exploring further together.

The one with Firecrest would give:

waiting time (real minues user)  7.89s
reguest rate [#/min] 113.12s

and the one with AsyncFirecrest:

waiting time (real minues user)  79.72s
, reguest rate [#/min] 11.24s

Which means submitting external_download requests for 15 files takes about 79 seconds, a bit longer than I expected, especially considering AsyncFirecrest is designed to reduce the processing time from the 7 seconds Firecrest takes. Could there be something I'm missing on my end? I'd appreciate any insights or suggestions you might have.

First script, which is not using async:

from firecrest import ClientCredentialsAuth, Firecrest
from queue import Queue
from time import monotonic, process_time 

client_id = '.'
secret = '.'
token_uri = "https://auth.cscs.ch/auth/realms/firecrest-clients/protocol/openid-connect/token"
fire_url="https://firecrest.cscs.ch"
_machine= "daint"#"eiger"

_client = Firecrest(
    firecrest_url=fire_url,
    authorization=ClientCredentialsAuth(client_id, secret, token_uri),
    )

remote_tree = '/scratch/snx3000/akhosrav/test-1mb/'#'/capstor/scratch/cscs/akhosrav/test-1mb/'
local_path =  '.' 
files = _client.list_files(_machine, remote_tree)

NN = 15

def getfiles( list_: list
    ) -> None:

    q = Queue(maxsize = 0)

    # Blocking call
    for i, item in enumerate(list_[:NN], start=1):
        down_obj = _client.external_download(_machine, remote_tree + str(item['name']) )
        q.put([down_obj , local_path  + str(item['name']) ]) 

user_i = process_time()
real_i = monotonic()

getfiles(files)

user = (process_time() - user_i)
real = (monotonic()    - real_i)
print(f'waiting time (real minues user)  {real-user:.2f}s')
print('reguest rate [#/min]',f'{60*NN/(real):.2f}s')

AND WITH AsyncFirecrest:

import asyncio
from firecrest import ClientCredentialsAuth, AsyncFirecrest 
from queue import Queue
from time import process_time, monotonic

client_id = '.'
secret = '.'
token_uri = "https://auth.cscs.ch/auth/realms/firecrest-clients/protocol/openid-connect/token"
fire_url="https://firecrest.cscs.ch"
_machine= "daint"# "eiger"

remote_tree = '/scratch/snx3000/akhosrav/test-1mb/'#'/capstor/scratch/cscs/akhosrav/test-1mb/'
local_path = '.' 

NN = 15

async def submit_download(name, queue, As_client):
    while True:
        item_ = await queue.get()
        down_obj = await As_client.external_download(_machine, remote_tree + str(item_['name']) )
        queue.task_done()

async def main():

    As_client = AsyncFirecrest(
        firecrest_url=fire_url,
        authorization=ClientCredentialsAuth(client_id, secret, token_uri),
        ) 
    list_ = await As_client.list_files(_machine, remote_tree)

    queue = asyncio.Queue()

    for _ in list_[:NN]:
        queue.put_nowait(_)

    tasks = []
    for i in range(4):
        task = asyncio.create_task(submit_download(f'worker-{i}', queue, As_client))
        tasks.append(task)

    await queue.join()

    for task in tasks:
        task.cancel()

    await asyncio.gather(*tasks, return_exceptions=True)

user_i = process_time()
real_i = monotonic()

asyncio.run(main())

user = (process_time() - user_i)
real = (monotonic()    - real_i)
print(f'waiting time (real minues user)  {real-user:.2f}s')
print(', reguest rate [#/min]',f'{60*NN/(real):.2f}s')
ekouts commented 7 months ago

Hi @khsrali,

Indeed there is something you can easily tweak. Try defining your client like this:

    As_client = AsyncFirecrest(
        firecrest_url=fire_url,
        authorization=ClientCredentialsAuth(client_id, secret, token_uri),
        ) 
    As_client.time_between_calls = {
        "compute": 0,
        "reservations": 0,
        "status": 0,
        "storage": 0,
        "tasks": 0,
        "utilities": 0,
    }

The asynchronous version of pyfirecrest has a mechanism with which you can set your preferred rate limit per microservice. By default I have set this to 5 seconds and indeed this makes everything unnecessarily slow. 😅 😬

Please let me know if the time of the async client improves.

PS. I add here the github account of Elia: @theely

khsrali commented 7 months ago

Hi @ekouts

Haha, this was certainly an improvement :) , thanks!

AsyncFirecrest

waiting time (real minues user)  8.75s
reguest rate [#/min] 100.94s

Firecrest

waiting time (real minues user)  7.75s
reguest rate [#/min] 115.16s

Actually, I was hoping that the async approach would really speed things up, theoretically aiming for around 0.5 seconds in total, given we have 15 files and the ideal time would be roughly 7.75 seconds divided by 15. Do you think it's possible that the server might be a bit slow or maybe there are some limitations in place? Just trying to figure out why it's not as quick as we hoped.

ekouts commented 7 months ago

Yes, I was also looking at the code in the morning and I was trying to figure out why it's not faster. It is coming from this imposed rate limit, because I am using some locks that end up serialising the request per microservice. I could remove the locks when the rate limit is 0, but I don't want to completely remove the logic of it. I think I can fix it but I don't think I will have time for it this week.

I will ping you as soon as I have something to try. But thanks again for the feedback, it has been already very helpful in improving the async client!

khsrali commented 7 months ago

Thank you very much, @ekouts! I appreciate your following up. Below, I've summarized the key points we've touched upon both here and in our recent meeting, for our future reference:

Efficiency:

We may consider reviewing the locking policy and discuss the possibly increasing rate limit. The goal is to reach a waiting time of (1/N)th of non-asynchronous operations, so ~0.5 seconds in the above script.

Practicality:

It might be beneficial to offer users the option to interact with the lower level of asynchronous operation, for example:

loop = asyncio.new_event_loop()
loop.run_until_complete(main())
loop.close()

To enhance usability and prevent the need for users to instantiate multiple clients, we could allow users to decide whether to maintain a session active or to terminate it following a asyncio.run(), as seen in httpx.AsyncClient().

Additionally, the idea of making simple_delete asynchronous or, alternatively, providing an option to control this behavior, e.g. simple_delete(, block=True), might offer improved flexibility.

I think these adjustments could contribute in our use of async client in AiiDA. I look forward to hearing your thoughts and any further suggestions you might have. Thank you again!

ekouts commented 7 months ago

Hi @khsrali! We made a release yesterday with some of the changes we discussed in the issue, in case you would like to try it. You can find more details in #96 and #97 . We also reduced the default sleeping times in both clients. The last issue about performance in the async client is not addressed in the release. I will let you know when we have news on this.

khsrali commented 7 months ago

Thanks a lot @ekouts for the good news! looks good.

ekouts commented 5 months ago

Hi @khsrali , I just wanted to give you some updates about this issue.

We have made some improvements in #107. We still need to test it a bit more, so I will let you know when it's ready to be merged. We have removed the locks from most of the requests and have kept them only for the case that some requests can be merged (for example when there are many GET requests to /tasks).

Efficiency: ... The goal is to reach a waiting time of (1/N)th of non-asynchronous operations, so ~0.5 seconds in the above script.

There are some limitations from FirecREST's side that don't allow to reach this at the moment.

khsrali commented 5 months ago

We have made some improvements in https://github.com/eth-cscs/pyfirecrest/pull/107. We still need to test it a bit more, so I will let you know when it's ready to be merged. We have removed the locks from most of the requests and have kept them only for the case that some requests can be merged (for example when there are many GET requests to /task

Thanks a lot @ekouts for the heads up! I'm currently traveling; will definitely have a look at PR 107 in a few days and get back to you.

khsrali commented 5 months ago

Thanks a lot @ekouts

I understand and agree the bottle neck is the rate limit from FirecREST server itself. The main concern I have, is that it appears that tasks are being completed one by one due to here and @_retry_requests, instead of running asynchronously. Before getting more in PR https://github.com/eth-cscs/pyfirecrest/pull/107, let me start with an alternative suggestion of how things could be in AsyncClient:


NN =200

class AsyncClient:

    def __init__(self):
        self._session = httpx.AsyncClient(timeout=200.0)
        self.counter = 0

    async def _post_request(self, sourcePath):
        task = asyncio.create_task(
            self._session.post(
            url=f'{fire_url}/storage/xfer-external/download',
            headers={'Authorization': f'Bearer {TOKEN}', 'X-Machine-Name': machine},
            data={'sourcePath': sourcePath}))
        return sourcePath, task

    async def _run_tasks(self, tasks):
        results = await asyncio.gather(*[task for _, task in tasks])
        return {sourcePath: result for (sourcePath, _), result in zip(tasks, results)}

    async def send_download_req_careless(self, list_):
        self.counter += len(list_)
        tasks = [await self._post_request(sourcePath) for sourcePath in list_]
        results = await self._run_tasks(tasks)
        return results

    async def send_download_req_assure(self, results={}, retry_list=[]):

        for sourcePath, response in results.items():
            if response.status_code == TOO_MANY_REQUESTS_CODE:
                retry_list.append(sourcePath) 
            else:
                print(f"Success for {sourcePath} with status {response.status_code}")

        if retry_list:
            chuncks = [retry_list[i:i + CLOGGING_LIMIT] for i in range(0, len(retry_list), CLOGGING_LIMIT)]
            results = {}
            for chunk in chuncks:
                results.update(await self.send_download_req_careless(chunk))

            # Maybe one can implement an exponential backoff here
            # await asyncio.sleep(2 ** call_count)  

            await self.send_download_req_assure(results,retry_list=[])

async def main():
    client = AsyncClient()

    await client.send_download_req_assure(retry_list=files[:NN])
    print(f"Total number of requests: {client.counter}")

CLOGGING_LIMIT = 200 
# This is the limit for the number of requests that can be sent in a short period of time

TOO_MANY_REQUESTS_CODE = 429

from time import process_time, monotonic

user_i = process_time()
real_i = monotonic()

asyncio.run(main())

user = (process_time() - user_i)
real = (monotonic()    - real_i)
print(f'waiting time (real minues user)  {real-user:.2f}s')
print(', reguest rate [#/min]',f'{60*NN/(real):.2f}')

N=200 files, this will gave me

Total number of requests: 1305
waiting time (real minues user)  138.69s
reguest rate [#/min] 84.11

Note the code above has send requests 1305 times! meaning that the server responded to 1105 requests by saying too-many-requests. This indicates that, if the rate limit were higher, the code above might be able to process the 200 requests within the firecrest implementation as it stands right now, taking about only 20 seconds, instead of 140~150!

While with https://github.com/eth-cscs/pyfirecrest/pull/107 pyfirecrest I get

waiting time (real minues user)  150.00s
, reguest rate [#/min] 79.58

And requests are processing one by one. (to be understood why, it should not be this way). Even if that rate limit were hypothetically high, the difference with result of the code above might get worse.

This way or that way, unfortunately, as of right now, using async programming in whichever way won't produce better results than a regular client due to the rate limit. As you can see the outcomes of a blocking call as follows—no async:

waiting time (real minues user)  143.18s
reguest rate [#/min] 83.40

To keep it short: I think Asyncfirecrest could easily be scaled and useful when FirecREST v2 is released only if it could send requests as a group, asynchronously. One can always handle failures like the suggestion above.

If you would like to think about using that strategy in Asyncfirecrest, I'd be happy to talk about it in more detail during a Zoom call.

ekouts commented 5 months ago

Thanks a lot @khsrali ! I will go through your suggestion and take your offer for a Zoom call (I will schedule through an email). Just a very quick comment, my original idea on why the async client would be faster was because of the sleep times between the polling + the fact that normally you would be making concurrent requests to different microservices so you wouldn't be affected as much by the rate limit + the serialisation of the requests on the server side.

khsrali commented 5 months ago

Many thanks to @ekouts for our Friday meeting. As promised I'm logging here what we discussed for the record.

As it turned out, the code I provided in my previous comment is not much better than Asyncfirecrest. The reason I originally thought this approach was better was due to serialization I saw in the responses. I initially thought this to Asyncfirecrest since I was receiving responses one by one. However, as I learned from Eirini, the serialization is done by the server when the same microservice is called. Essentially, the async client does well by sending requests almost simultaneously, but the server processes these requests one by one. Currently, it usually takes about ~0.7s for each xfer-external/download call. Note that this ~0.7s is the time the server only receives your request. There is additional waiting time when the server is actually moving files to an external storage.

Given this limitation from the server side, the only gain is to save on additional sleep times during the URL retrieval stage, which is the time the client is idle while waiting for the server to stage files. Therefore, it seems nothing can be done to further asynchronise the initial request unless the server allows that.

With this clarification, I think we can now close this issue.