apache / airflow

Apache Airflow - A platform to programmatically author, schedule, and monitor workflows
https://airflow.apache.org/
Apache License 2.0
37.48k stars 14.37k forks source link

Consolidate "Serialization" code #40974

Open kaxil opened 4 months ago

kaxil commented 4 months ago

We have different ways to do Serialisation in Airflow --- and also multiple way of pickling (dill, stdlib pickle, cloudpickle).

Would you like to add additional color here @bolkedebruin @amoghrajesh

kaxil commented 4 months ago

cc @gyli since you waned to take this issue. Could you add a comment to this GitHub issues, please? Your name only shows up for me to add in Assignees once you have a comment on it.

pedro-cf commented 4 months ago

~~Not sure if related, but there is issues with serializing context for some Python Operators (ex: PythonVirtualEnvOperator) Perhaps this work could potentially solve this issue? @kaxil~~


Edit: New PR 41049 fixes this.

gyli commented 4 months ago

Hi @kaxil , I have been reviewing serialization codes recently, while I haven't noticed any significant parts that need to be consolidated, especially after the improvement of lazy load serialization module. The other issue mentioned above https://github.com/apache/airflow/issues/7870, is already completed. So we need more clarification from @bolkedebruin what needs to be consolidated in his proposal.

kaxil commented 4 months ago

cc @uranusjr who might also know about it since he worked with Bolke on it

kaxil commented 4 months ago

I remember different ways of Serialization used for Xcom, Airflow Webserver/ORM but Bolke or TP can add more specific details:

https://github.com/apache/airflow/blob/14e9759f6a85776772ab7d08a372c290c0380563/airflow/models/dagpickle.py#L45

https://github.com/apache/airflow/blob/14e9759f6a85776772ab7d08a372c290c0380563/airflow/utils/cli.py#L269-L278

https://github.com/apache/airflow/blob/14e9759f6a85776772ab7d08a372c290c0380563/airflow/providers/http/triggers/http.py#L106

https://github.com/apache/airflow/blob/14e9759f6a85776772ab7d08a372c290c0380563/airflow/providers/cncf/kubernetes/decorators/kubernetes.py#L103

https://github.com/apache/airflow/blob/14e9759f6a85776772ab7d08a372c290c0380563/airflow/models/xcom.py#L673-L697

https://github.com/apache/airflow/blob/14e9759f6a85776772ab7d08a372c290c0380563/airflow/utils/json.py#L46

potiuk commented 4 months ago

The problem as I understand it is tha we currently have two serializations:

1) https://github.com/apache/airflow/blob/main/airflow/serialization/serialized_objects.py -> this one uses "old" serialization where you have giant if/else clause where we have to manually add new type of objects to serialize

2) https://github.com/apache/airflow/blob/main/airflow/serialization/serde.py -> which is a pluggable and presumably way faster way of serializing. It uses "pluggable" https://github.com/apache/airflow/tree/main/airflow/serialization/serializers modules and I believe you can also add your own serializers and implement "serialize/deserialize" methods in your objects. It has no giant if, providers could potentnially register their own serializers etc. etc.

Some of our code uses 1) - some uses 2). There are a few problems there - for example 2) does not yet support DAG serialization, and it also is not used for example in internal-api (but internal-api will be gone in Airlfow 3).

There are also other places where we use other serialization mechanisms (dill, cloudpickle) and there is a potential tha we could consolidate that as well.

Ideally (this was the long term vision of @bolkedebruin) we should have "one to rule them all" - 2) should become universal serializer used by everything.

bolkedebruin commented 4 months ago

Ha! There are some things to consider when wanting to do "consolidation". The main leading principle in the past was that we do not want to have executable code when deserializing. This is what all third-party (de)serializers seem to do, pickle, dill, cloudpickle etc. The second principle was to have a human readable format and the third principle was to have it versioned.

I've added "serde" (no 2. that @potiuk is speaking of in the past) to have a generic way of serializing any object with the principles in mind. This is particularly useful for XCom as that shares arbitrary data across workers. The 'other' serializer which I would call the DAG serializer has three main short-comings: 1) It is slow - serde takes about 10% of the time the DAG serializer takes, 2) it is hard to extend, you would need to change the core code to add an extension and 3) it will add O(n) in time to do so. The upside is that it is tried and tested, serializes DAGs and does JSON schema validation.

It might then seem the obvious route to add DAG serialization to "serde". Which it did try, but also felt a bit like squeezing something into something else where it doesnt entirely fit (keeping backwards compatibility in mind). It is possible, but a lot of past cruft would need to be re-implemented to make it work.

Now I see other projects like Spark Connect settle on Cloudpickle and they forego the issue of arbitrary code execution. The question then becomes how relevant is that attack vector? Is the tradeoff in maintaining our own serializer worth it? Also it will not generate a human readable format. Do we need to review our principles (which I think have never been officially settled, but correct me if I am wrong).

Concluding: if you add DAG seralization to "serde" it is probably the most Airflow way to go. It gives you extensibility for the future as it has the better format (over the DAG serializer) and can with a little bit of help serialize any kind of object. It seems to serve us well nowadays. If you take a step back and want to re-evaluate it might be worth re-visiting our principles and checkng what we can do to reduce the attack vector and maybe go for cloudpickle. This would externalize the support and reduce the maintenance burden. However, we might run into issues when we cannot serialize through cloudpickle and we do not control how it works.

(I'll add some additional notes later that are historically important, but slightly less relevant).

My 2 cents.

potiuk commented 4 months ago

Also (to add to what @bolkedebruin wrote ) in internal-API we used Pydantic for some of that - mostly because it alows to serialize ORM SQLalchemy objects together with their relations. But this is possible to plug-in in serde as well as another serializers and treat it similarly to "to_dict" or "to_json"..

Also I think there was a lot of misunderstandings and clashes (and I am one of the guilty ones there) about which and why and how we are using serializers - precisely because we have not agreed on a common strategy we want to take, direction and vision we have. It has never been agreed and writen down, and I think we should start with that.

For example right now when I look at serde - it's a really good idea and if we extend the concept of it to be able to provide seriaizers from respective providers and make it support all that we need in "core" airflow, that could be a good idea for example (but we had disagreements on that in the past - again mostly because we had no consolidated vision and because in many cases it was just easier and faster (and without the whole picture and vision set) to use another approach than common. And possibly that was even good because that would in some cases break compatibility too much for some Airlfow 2 cases.

But with Airflow 3, I'd be personally for making serde default and incorporate the 'DAG' serializer (after playing a bit with both) and making it the default engine and plugging in whatever serializer implementations are good. That should of course include all the compatibility and migration pieces as needed. We could also possibly distribute the current serde modules (deltalake, iceberg etc - as "pluggable" / external serializers in providers).

That would be my 1c .

gyli commented 4 months ago

Thanks for the explanation! There are 3 points that I really agree, and can explore in this issue:

  1. Consolidating DAG serializer and serde by adding DAG serializer into serde.
  2. We will need a full evaluation on Cloudpickle. Other than maintainability, full control of the output, human readable format, what are the other pros & cons.
  3. It would be great to move serde modules to providers, and we can explore that in this issue.
bolkedebruin commented 4 months ago

Sounds good. Although I don't understand your point 1 fully. Yes to consolidation, but "making airflow modules select serialization engines" I do not understand. Care to the clarify?

gyli commented 4 months ago

I've edited my language in my comment above. I was thinking about supporting the old DAG serializer and the consolidated version at the same time for smoother migration, while I also feel it could be unnecessary if the serialization format is not changed. I'm currently thinking adding DAG and operator serializer under serde, that still calls methods from serialized_objects, and making serde as the only entrypoint for all serialization gradually. Does it sound good to you?

potiuk commented 4 months ago

Also as mentioned in https://github.com/apache/airflow/issues/31213 -> it might be a good exercise to migrate AIP-44 for Airflow 2.10 to use serde eventually. While AIP-44 will be experimental - only in 2.10 (and possibly 2.11), I think it might play a major role in preparing our users to migrate (and test isolation before) to Airflow 3 with AIP-72 rewrting the isolation code from scratch, so this effort is not lost.

bolkedebruin commented 4 months ago

@gyli It might be possible, but note there are a lot of assumptions within the DAG serializers about the output format. I'm not sure it is worth the effort to re-use the old serializer code. Serde has a strict schema / format (versioning specification), while the serialized objects of the DAG serializer do not.

In your place I would focus on 'on-the-wire' compatibility so that the serialized format ends up compatible with the JSON schema used. This will require a second pass serializer.

As @potiuk mentioned ensuring AIP-44 doesn't use the legacy path any more might be an easier way to get used to the code and has more benefits. The DAG serializer is quite contained now except by its use of AIP-44.

gyli commented 4 months ago

Yeah I can start from https://github.com/apache/airflow/issues/31213 first.

Note that by doing so, internal_api_call has to temporarily call either old serializer or serde depending on the argument type, because the the API argument could be DAG or operator, which is currently not supported by serde. It can only be completely switched to serde once the serializer consolidation is done.

potiuk commented 4 months ago

I am not sure we allow DAG or Operator to be passed over the internal-api. I think it should not be the case and we should likely change it. I will review it shortly as well.

gyli commented 4 months ago

Here are some internal API endpoints I found with DAG/operator arguments: https://github.com/apache/airflow/blob/6684481c67f6a21a72e7f1512b450a433c5313b5/airflow/cli/commands/task_command.py#L163 https://github.com/apache/airflow/blob/f9c56cb321a1c985736ec888f1e37275f646867e/airflow/models/taskinstance.py#L910

potiuk commented 4 months ago

I will take a look today :)

gyli commented 4 months ago

@bolkedebruin Still thinking about the best approach here. The tricky part is how to handle different encoding if old code is used indeed. While even if we decide not to use the old codes, we will still need to implement classes similar to SerializedBaseOperator, SerializedDAG, DependencyDetector, etc. in serde, with mainly the serialize and deserialize methods replaced with serde methods.

Approach 1, calling serialized_objects from serde, and keeping the old output format for DAG and operator. This approach requires minimum changes, and existing jsonschema can be reused directly. While the cost is, both encoding format will be used.

Approach 2, porting SerializedBaseOperator, SerializedDAG, DependencyDetector classes into serde, and replacing serialize and deserialize methods with serde's. A new jsonschema file is needed as the encoding will be changed. Another reason to port those classes instead of creating a new serde serializer module is, dag serialization is recursive. This approach will make serde.py codes much longer, probably similar to the length of current serialized_objects.py eventually.

I gave a second thought and second approach makes more sense to me. Although it requires more work, we might want a complete migration from old to new serializer considering this is an Airflow 2 to 3 change. Please let me know what you think.

bolkedebruin commented 4 months ago

I'm mostly AFK - Olympics etc - so a bit brief and can't really deep dive into the code.

I think targeting a new schema, your approach 2 is best, while for deserialization having detection of the old format and using the old deserializers for some time is smart. We do want people to be able to move from Airflow 2 -> 3 but I don't think we require them to entirely start from scratch.

Having the updated schema be as close to serde's native output format, basically dicts with classnames and versioning information, requires the least passes so will be the fastest. It's also easier then so switch to another output format (say protobuf) if we would like that in the future.

It might be worthwhile to have a mix of a serde module and porting of the classes. Classes are slow, the overhead on the stack is significant. This is one of the reasons why serde does what it does. Another reason is that the deserialized versions of DAG and Operator are not the same as DAG and Operator. So it night be cleaner to keep them separated. Another one is that the "serialize" and "deserialize" methods in serde's modules have priority over the ones in the classes, so that allows you to keep the existing infrastructure in place while replacing it with serde's.

Summarizing my thoughts:

For now my thoughts. Thanks for taking this on. I'll help where I can.

kaxil commented 3 months ago

@gyli : Could I assign this task to you? Do you have enough clarity?

gyli commented 3 months ago

Hi @kaxil, yes, please assign it to me. It's progressing slowly though.