inngest / inngest-py

Python SDK for Inngest: Durable functions and workflows in Python, hosted anywhere
https://www.inngest.com/docs/reference/python
Apache License 2.0
27 stars 6 forks source link

Rich pydantic-type return values support #68

Open slep0v opened 6 months ago

slep0v commented 6 months ago

Hello, currently functions that return pydantic models (and I guess any json-serializeable object) get silently dumped/serialized into primitives.

I wonder if it is possbile to implement the logic using correct deserialization, so that you still use json for step memoization, but step.run function returns original type.

Because having to manually serialize an object before returning it (otherwise there is a confusion when step call with return type -> SomePydanticModel silently returns dict) and then deserialize results in-between the steps could add some redundant code.

To do that, we could try to access function annotations via the @task decorator wrapping proposed in #63

goodoldneon commented 5 months ago

Hello, currently functions that return pydantic models (and I guess any json-serializeable object) get silently dumped/serialized into primitives.

I didn't realize that but you're right 😄. It works as a side effect of our internal Pydantic usage, however type-checking should fail if a step.run returns a Pydantic object

I wonder if it is possbile to implement the logic using correct deserialization, so that you still use json for step memoization, but step.run function returns original type.

You mean by using typing.get_type_hints to get the return type of the step.run callback? That would work with an explicit return type:

def get_user() -> User:
    return User(id=1, name="John Doe")
user = await step.run("do-thing", get_user)

But reflection can't know about an implicit return type:

def get_user():
    return User(id=1, name="John Doe")
user = await step.run("do-thing", get_user)

And what would we do if it returns a union of Pydantic models?

def get_user() -> User | Admin:
    return User(id=1, name="John Doe")
user = await step.run("do-thing", get_user)

We absolutely want to make Pydantic-based validation a first-class citizen in the SDK (Pydantic is amazing), but there are a lot of questions about how to effectively do that. We've had similar discussions with Zod in our TypeScript SDK

slep0v commented 5 months ago

typing.get_type_hints

Yes, I think that could be a solution, but as I see it, only if we define a step function statically with a special wrapper, so it knows how to deserialize step.run basic types output back into Pydantic models (or even other custom classes that have __init__ method that would take **dict to build an object).

From the wrapper concept that I proposed in #63, we have an access to the initial function we're wrapping so we could store return types and then deserialize. If none present - fallback to the default primitive type behaviour.

@staticmethod
    def task(name: Optional[str] = None):
        """ Wraps a function to be executed as an Inngest step further. """
        def decorator(func):
            async def wrapped(self, *args, **kwargs):
                task_name = name or func.__name__
                return_type = typing.get_type_hints(func).get("return")  # <---

                primitive_result = await self.step.run(task_name, lambda: func(self, *args, **kwargs))
                result = maybe_convert_to_rich_object(primitive_result, return_type)  # <---

                return result

            return wrapped

        return decorator

Otherwise (in dynamic step calls) I think it will be really hard to restore an object during memo retrieval stage. We'd have to grab a return object type when a step initially calculated and store it somehow for later object loading. This could be done via model schema dump / build, but as far as I know Pydantic does not have such option yet.

goodoldneon commented 5 months ago

We could definitely do this with step.run as-is. We'd make this change to step.run (here):

if not isinstance(memo, types.EmptySentinel):
    model = typing.get_type_hints(handler).get("return")
    if issubclass(model, pydantic.BaseModel):
        return model.model_validate(memo.data)

This would work well when handler (a.k.a. the step.run callback) is explicitly typed, but would return a dict when it isn't.

As far as I can tell, we can't express the "return type changes based on explicit return types" behavior in types. That leads a major footgun where step.run's return type is a Pydantic object but at runtime it's a dict

slep0v commented 5 months ago

But will this work in case handlers are passed as closures using lambda?

def step_func(param1: type1, param2: type2) -> ReturnType
    return ReturnType(param1, param2)

val1, val2 = ...

final_result = step.run("step-name", lambda: step_func(param1=val1, param2=val2))

In this "dyamically-defined step" case lambda is not typed, so you won't be able to extract return type from it. And at least for me when I build pipelines, it is very common that steps have input values from previously executed steps.

goodoldneon commented 5 months ago

It won't work with lambdas since they're implicitly typed. That said, the downside for lambdas is the same downside for implicitly typed functions. It's a really big footgun that would apply to both the object-oriented approach you proposed and the existing functional approach.

A separate issue is union return types. Let's say that your function returned Admin | User, where those models are defined as:

class Admin(pydantic.BaseModel):
    id: int
    name: str
    role: typing.Literal["admin"] = "admin"

class User(pydantic.BaseModel):
    id: int
    name: str

We could iterate over the union members and attempt to instantiate each Pydantic model, returning the first model that passes validation. But that can lead to unexpected behavior if you don't properly discriminate the union.

For example, let's say you had this step.run handler:

def step_handler() -> Admin | User:
    return User(id=123, name="Alice")

If we iterated over the union members, we'd actually instantiate an Admin model since {"id": 123, "name": "Alice"} is valid for both Admin and User

goodoldneon commented 5 months ago

I also wanna mention that runtime model validation can be tricky if your schema changes over time. Like if you add a new required field to your model and then deploy, active runs (whose memoized value adheres to the old model) will start failing. We want to make validation more first-class in Inngest (Pydantic in Python and Zod in TypeScript), but we need to think about it more before implementing something.

In the meantime, the current approach of roll-your-own model validation is flexible enough to let you transform the data however you need before validating. It's more verbose, but we want to avoid adding a bad abstraction -- we want to get it right

slep0v commented 5 months ago

I think the gotchas of union usage should be carefully avoided by users and be out of your scope. As it is a default behavior of Pydantic itself.

from pydantic import BaseModel

class A(BaseModel):
    x: int

class B(BaseModel):
    x: int

class C(BaseModel):
    inner_attr: A | B

d = {"inner_attr": {"x": 2}}
C(**d)  # >>> C(inner_attr=A(x=2))

Returning to the initial-type-restoration topic:

Yes, you are right that lambdas are usually rare. But in case of defining flows with your package, they appear in each step.run invocation if user wants to predefine functions to be used as step handlers and then use them:

def foo(*args):
    return ...

def bar(*args):
    return ...

init_args = ...
foo_result = step.run("id1", lambda: foo(*init_args))
bar_result = step.run("id2", lambda: bar(*foo_args))

The code is very clear and follows pretty common (as far as I know) approach - you define necessary functions that process your data first (in the same or another file) and then utilize those functions. But in current design you have to wrap everything in lambdas (i.e. create closures inplace).

Without lambdas you must define a handler as a closure that acts on variables in the scope, not on input arguments, after a previous step is executed:

init_args = ...

def foo():
    # access `init_args` defined in the same scope with `foo`
    return ...

foo_result = step.run("id1", foo)

def bar():
    # access `foo_result` defined in the same scope with `bar`
    return ...

bar_result = step.run("id2", bar)

For me this makes any non-trivial pipeline way harder to read and maintain. Moreover it dictates me how I should write my code - putting me into a stiff framework.

I can't speak for others, but as I see, it would be hard for people that previously used airflow/prefect/wrote custom pipelines flow logic/come from ML to transit to Inngest if they'd have to stick to the option # 2

goodoldneon commented 5 months ago

Oh, I totally agree that users shouldn't be required to use your second example. Your first example is 100% fine -- no issues with that. I think I removed the "lambdas aren't common" part of my message after you started responding 😄.

What are your thoughts on a utility function that handles validation? Something like this:

ModelT = typing.TypeVar("ModelT", bound=pydantic.BaseModel)

def validate_output(
    output: typing.Mapping, model: type[ModelT]
) -> ModelT:
    return model.model_validate(output)

Then you could do this:

class User(pydantic.BaseModel):
    id: int
    name: str

def _get_user(user_id: int) -> User:
    return User(id=user_id, name="Alice")

@client.create_function(
    fn_id="my-fn",
    trigger=inngest.TriggerEvent(event="my-event"),
)
async def fn(
    ctx: inngest.Context,
    step: inngest.Step,
) -> str:
    user = validate_output(
        await step.run("get-user", lambda: _get_user(1).model_dump()),
        User,
    )

    return user.name

It's more verbose but it's also more type safe. It also has the advantage of being less rigid, allowing you to add custom transformations in between the "get memoized output" and "validate output into model" calls, like this:

user = validate_output(
    convert_user_v1_to_v2(
        await step.run("get-user", lambda: _get_user(1).model_dump())
    ),
    User,
)
slep0v commented 5 months ago

This could work, but I think that this may be an overengineering and "responsibility intertwining", as the original topic is the "unexpected return behaviour of step.run".

What I'm trying to address is the issue "I explicitly stated in my code that my function returns T and I want that after step.run I still get T".

I think that cases like

transformations in between the "get memoized output" and "validate output into model" calls

should not be covered because optimistically user should be agnostic of package inner structure to be able to work with it.

In the context of a pipeline (or task flow) if we want to act on a task result (convert_user_v1_to_v2), well, this is an additional task/transformation in the flow - this is the responsibility of a user. And in this this particular example transformation should take UserV1Model, not a dict, because user knows that this function will act upon previous step result that was explicitly defined to return a UserV1Model.

But providing an intuitive execution logic - this is a package responsibility. And it almost is, except this typing thing :)


For the current design I think it will be enough to pass an additional output_model param to the step.run. But I'd prefer sticking to a static @task decorated function, which, I understand, you need a time to consider.

goodoldneon commented 5 months ago

the original topic is the "unexpected return behaviour of step.run"

The ability to return Pydantic models is a bug since we don't officially support that right now. There should be a type error for it, but we mistakenly allowed it at runtime. We might release a fix where we raise an error when a Pydantic model is returned, but we'll have to decide if the fix is worth breaking code.

I think it will be enough to pass an additional output_model param to the step.run

That's a possibility (though it'll require 2 additional function overloads here). Adding output_model would mean you could do this:

user = step.run("my-step", lambda: User(id=123), output_model=User)

Instead of this:

user = User.model_validate(step.run("my-step", lambda: User(id=123).model_dump()))

While the latter is a little more verbose, it's also more flexible. One of our north stars is "build primitives and let usage dictate abstractions". In other words, we like to enable advanced usage with flexible (but sometimes verbose) primitives and get a lot of user feedback before making the advanced usage first-class.

A good example of this approach is our recent step.invoke method. Users were combining step.wait_for_event and step.send as a "call a function and wait for a response" pattern. This usage heavily informed the way we built step.invoke, and now our users are happy with the abstraction.

We'd love to observe how our users combine Inngest with Pydantic. Once we're able to gather a variety of nuanced usage patterns, we'll be able to effectively make Pydantic a first-class paradigm in our SDK