pydantic / pydantic-settings

Settings management using pydantic
https://docs.pydantic.dev/latest/usage/pydantic_settings/
MIT License
669 stars 65 forks source link

Recommend a way to load/reload settings in async environment #479

Open bswck opened 2 weeks ago

bswck commented 2 weeks ago

This is how I load/reload my settings in an asynchronous app:

async def get_settings_async(*, force_latest: bool = False) -> Settings:
    """Get the application settings (return cached settings by default) without blocking the event loop."""
    settings = settings_var.get(None)
    if settings is None or force_latest:
        settings = await asyncio.to_thread(Settings)
        settings_var.set(settings)
    return settings

The reason why I'm doing it this way is because the Settings settings model has toml_file specified in its model config which causes the initializer to directly interact with the blocking disk I/O when resolving config values: https://github.com/pydantic/pydantic-settings/blob/6fe3bd1e35ff3854808f57e099484317f7063238/pydantic_settings/sources.py#L1993-L1996

Calls to open() are blocking and halt the entire event loop for possibly longer than regular statements between awaits in typical coroutines, which can lead to unsound effects.

Let's create an API for loading configuration asynchronously or hint the users in the docs (possibly here and here) to use asyncio.to_thread/loop.run_in_executor in order to use a worker thread that can make the entire I/O code non-blocking, wrapped in a future and correctly awaited in a coroutine.

The big question is whether loading the config from sources is currently thread-safe (I'm guessing so).

bswck commented 2 weeks ago

Here's a short proof that I wrote for fun and to illustrate the problem with more depth and room for interaction. It's not meant to be very realistic, but just to show why blocking calls are bad.

Imagine there is a task group with 10000 async tasks that first load the newest config from the disk and then sleep for 0 seconds (and then check how much they actually slept). All those tasks are inserted to the task group one by one, letting the event loop run them concurrently:

```py import asyncio import time from collections.abc import Awaitable, Callable from concurrent.futures import ProcessPoolExecutor from pydantic_settings import BaseSettings max_sleeping_error = 0.0 class SomeSettings( BaseSettings, env_file=".env", toml_file="config.toml", extra="allow", ): pass async def without_executor() -> None: # simply load the settings, which is blocking. SomeSettings() async def with_executor() -> None: # load the settings in a worker thread, which isn't blocking (creating a thread is blocking though) await asyncio.to_thread(SomeSettings) async def measure_sleeping(callback: Callable[..., Awaitable[None]], sleep_for: float = 0.0) -> None: global max_sleeping_error # callback loads the settings await callback() started = time.perf_counter() # this lasts longer and longer as more and more blocking calls occur await asyncio.sleep(sleep_for) slept_for = time.perf_counter() - started max_sleeping_error = max(max_sleeping_error, slept_for - sleep_for) async def run_test(callback: Callable[..., Awaitable[None]]) -> None: async with asyncio.TaskGroup() as tg: for _ in range(10000): tg.create_task(measure_sleeping(callback)) def test_without_executor() -> None: # check how late asyncio.sleep() calls will be if we dont use an executor asyncio.run(run_test(callback=without_executor)) return max_sleeping_error def test_with_executor() -> None: # check how late asyncio.sleep() calls will be if we use an executor asyncio.run(run_test(callback=with_executor)) return max_sleeping_error if __name__ == "__main__": with ProcessPoolExecutor() as pool: fut_with_executor = pool.submit(test_with_executor) fut_without_executor = pool.submit(test_without_executor) print("[with executor]") print("max sleeping error:", fut_with_executor.result(), end="\n\n") print("[without executor]") print("max sleeping error:", fut_without_executor.result()) # can even exceed 1s! ```

If you run that, you'll see that some of the late tasks which use the executor make the event loop maximally stretch the time needed to wait for await sleep(0) completion for around ~0.05 secs (a negligible delay), while without the executor await sleep(0) in a late task can even lose about ~1.5 secs! (At least on my personal machine). And that's due to blocking calls still being performed in the preceding concurrent tasks!

hramezani commented 2 weeks ago

Thanks @bswck for the issue and explanation.

Let's document it at the end of our docs before In-place reloading