keleshev / schema

Schema validation just got Pythonic
MIT License
2.86k stars 212 forks source link

Support for async validator #201

Open thehesiod opened 5 years ago

thehesiod commented 5 years ago

Would be nice if there was an async def async_validate or something similar so you could have a coroutine as a validate method.

ljluestc commented 11 months ago
import asyncio
from schema import Schema, SchemaError

async def async_validate(schema, data):
    try:
        schema.validate(data)
    except SchemaError as e:
        raise e

async def main():
    x = Schema({"foo": int, "bar": str})

    try:
        await asyncio.gather(
            async_validate(x, {"foo": "lol", "bar": 1}),
            async_validate(x, {"foo": 42, "bar": "hello"})
        )
    except SchemaError as e:
        print("Validation error:", e)

if __name__ == "__main__":
    asyncio.run(main())
thehesiod commented 11 months ago

not as an outer, as an inner, ex: Use(coroutine())

ljluestc commented 10 months ago
import asyncio
from schema import Schema, SchemaError, Use

# Define an async validation function
async def async_validate(schema, data):
    try:
        schema.validate(data)
    except SchemaError as e:
        raise e

# Define a coroutine validation function for Use
def coroutine_validator(coro_func):
    async def inner_validator(value):
        await coro_func(value)
    return inner_validator

def main():
    x = Schema({"foo": int, "bar": str})
    x = x.validate(Use(coroutine_validator(async_validate)))

    try:
        asyncio.run(asyncio.gather(
            x.async_validate({"foo": "lol", "bar": 1}),
            x.async_validate({"foo": 42, "bar": "hello"})
        ))
    except SchemaError as e:
        print("Validation error:", e)

if __name__ == "__main__":
    main()