apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
36.86k stars 14.25k forks source link

Reset executor_config when clearing a task? #26568

Open dstandish opened 2 years ago

dstandish commented 2 years ago

Body

Currently if you clear a task, it won't clear the executor config. The executor config is pickled and added to the ti row at time of dag run creation by the dag processor. And when you clear a TI the state us updated but the executor_config remains unchanged.

In order to have the executor_config update properly we may have to have TI clear function as a dag processor callback, alternatively we could have clear in effect delete the TI row and then ... well what would happen? i think the scheduler would recreate it? but it doesn't even have access to the dag so how does it get the executor config?

@ash @jedcunningham

Committer

potiuk commented 2 years ago

What would be the reason we want to clear executor_config when we clear the task? Should not it use the original config that was there at the time the DAG was parsed? :thinking:

potiuk commented 2 years ago

CC: @mhenc

dstandish commented 2 years ago

The thing is potiuk it doesn't get updated when the dag gets parsed again. Let's say your task had a bad exec config. Then it fails. You update the code to fix it. Now you want to clear the task. The task will fail again because the exec config will be unchanged.

potiuk commented 2 years ago

The thing is potiuk it doesn't get updated when the dag gets parsed again. Let's say your task had a bad exec config. Then it fails. You update the code to fix it. Now you want to clear the task. The task will fail again because the exec config will be unchanged.

Right. That is a bummer indeed. I think in this case, consistent behaviour would be not to serialize it at DagRun creation but when DAG is parsed? Otherwise we are entering the realm of dag versioning, and it opens pandora's box,

Is there anything "dag-run" specific that makes it necessary to create the executor config at the moment when DagRun is created? Even if executor config has some "per-dag-run" data, then I think during DAG Parsin we should serialize it enough, to be able to recreate dagRun-specific one from serialized form. I think that would be pretty consistent behaviour with current "dag structure" ?

dstandish commented 2 years ago

i think it's created on dag run simply because there's no task instance record until the dag run is created. and the TI is where it's stored.

i think "serializing" on dag parse makes sense. but yeah, the TI record won't be there. and we don't have a task table!

so the next available option would be stick it in serialized dag, where there is some about tasks stored. BUT, for better or worse, exec config can be any pickleable python object, which means ... probably can't store it in the serialized dag json...

potiuk commented 2 years ago

so the next available option would be stick it in serialized dag, where there is some about tasks stored. BUT, for better or worse, exec config can be any pickleable python object, which means ... probably can't store it in the serialized dag json...

Might be a good time to actually serialize ii "properly"? I know there are K8S objects there potentially and it caused us a number of problems and customizable seriialization was needed.

We actually discussed that today with @mhenc and the fact tha K8S executor is "pickled" is one of the compilcations of the Internal API - AIP-44. It does not block it, but I think it makes it rather complex as we think we will be able to serialize all the communication for internal API using openapi/jsonrpc and likely we could avoid any "custom" serialization code.

We are trying out using Pydantic to the serialization for us.

So maybe that will be a good opportunity to try it out (and maybe Pydantic can help with that actually). As mentioned in https://pydantic-docs.helpmanual.io/usage/exporting_models/

pydantic can serialise many commonly used types to JSON (e.g. datetime, date or UUID) which would normally fail with a simple json.dumps(foobar).

dstandish commented 2 years ago

So as of recently, I made the change to run k8s pod objects through airflow's serialization process (this give us better stability when kubernetes library changed and unpickling an object pickled with old version) so now k8s pod objects are serialized before ultimately being pickled. So I believe from k8s exec perspective, we would be OK if we didn't pickle. But in theory some other executor may rely on arbitrary pickleable obj???

potiuk commented 2 years ago

I think in this case we could stay with the current path - if an executor fails to serialize (either with our serializer or any other) we could leave it empty at the parsing time and create it at dag-run time. This would make it backwards compatible, but also in order to get config clearable (or to use it with Internal API) the user would have to make it serializable.

Ticks all the boxes for me.

dstandish commented 2 years ago

you are suggesting that if executor config is json-serializable, then store it in serialized dag, and if not, fallback to pickling it in TI.executor_config? one thing about "json serializable" is that our serializer falls back to just str(obj) so the caller doesn't really know if it's an unrecognized type. i.e. any object type that isn't explicitly handled gets handled in this way:

        else:
            log.debug('Cast type %s to str in serialization.', type(var))
            return str(var)

so how do we know if the object is serializable?

potiuk commented 2 years ago

so how do we know if the object is serializable?

Wild thought. Not verified it - just loudly thinking.

We do not have to use our serialization, I think we mostly implemented it (when we implemented it) because there was no better alternative. Maybe we can try different serialization (the Pydantic mentioned threre might be a good candidate). Pydantic works mostly based on Type Hinting and figures out what to do based on that - thanks to that it is fast and "data validation" is precisely what Pydantic is all about. It's been created for that. All of the modern API frameworks in Python use it under the hood already to convert Python object into serializable data and back.

I have a hunch (have not tried it) that it will support serializing K8S objects out-of-the-box (or someone already wrote extension to do so) and that it should be easy to plug it in.

One more thing - Pydantic has a firm (and financially backed) plan to implement Pydantic 2 - https://pydantic-docs.helpmanual.io/blog/pydantic-v2/ which is already mostly done - with a ton of - mostly - performance improvements (like improving speed of json parsing 16 times thanks to using Rust-based super-fast validating jsoin parser. We could benefit from that when ready - standing on the shoulder of giants :).

Maybe it's time to see if it would work? Not sure about it before we try it, but that might be an interesting solution (we are looking at it with @mhenc to see if it can be easily used in our context).

dstandish commented 2 years ago

Took a quick look at it

image
dstandish commented 2 years ago

It seems to me very, very odd that kubernetes client library does not support conversion to and from json and yaml, given that yaml is like... THE way to define a k8s object?!? why is that?

that's why, when trying to address that unpickling-across-k8s-versions issue, i ended up using our serializer because k8s doesn't support this conversion out of the box

potiuk commented 2 years ago

:facepalm:

potiuk commented 2 years ago

So... Let's add a "serializable" flag to ExecutorConfig :). Less automated, more back-compatible, we can use any serialization we want (our)