laysakura / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
5 stars 7 forks source link

[Feature Request]: Design adequate yet flexible ways to serialize/deserialize DoFn payloads #15

Open sjvanrossum opened 1 year ago

sjvanrossum commented 1 year ago

What would you like to happen?

During pipeline execution, the bundle processor retrieves a process bundle descriptor, which contains descriptors for transforms, collections, windowing strategies, coders and other relevant descriptors to execute a pipeline. Reconstructing the pipeline graph and runtime types on a worker might have unintended side effects due to variations in the execution environment (e.g. hostname, RNGs, user names, etc.). The graph and all its types as it existed at pipeline construction time must be exactly replicable at pipeline execution time. The FnAPI BeamFnControl service enables FnAPI SDKs to retrieve serialized representations of all relevant objects to make this work.

Rust does not support handy features such as wildcard generics in Java to treat all generic trait implementations as equal types. Rust also does not ship with a usable RTTI system in stable Rust, which could then be used to facilitate (de)serialization more easily. We thus require ways to specify typed and untyped views of all objects. For example, take this oversimplified DoFn example:

trait TypedDoFn {
  type I: Any;
  type O: Any;

  fn process(&mut self, element: &I) -> O;
}

trait UntypedDoFn {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any>;
}

struct MyDoFn;

impl DoFn for MyDoFn {
  type I = String;
  type O = String;

  fn process(&mut self, element: &Self::I) -> Self::O {
    element.to_uppercase()
  }
}

We can either provide a blanket UntypedDoFn implementation for all TypedDoFns as such:

impl<I, O, T: DoFn<Input = I, Output = O>> UntypedDoFn for T {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
    Box::new(self.process_element(element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
  }
}

No issues so far until we need to serialize the DoFn and deserialize it on another machine. Besides the point that the serde crate is the most likely pick for serialization/deserialization and that an efficient binary format like bincode would be preferred, how do we know which struct type the serialized data represents and what UntypedDoFn vtable is required to operate on it? Type registries with registered deserializers do not ship as part of the runtime as it does in Java for example. There are a fair number of crates which solve this, such as bevy_reflect, typetag and serde_traitobject. Both bevy_reflect and typetag (stable features) solve this using type registries, with Bevy requiring users to create and register types manually, whereas typetag relies on the inventory and ctor crates to automatically reconstruct this using life before main. Lastly, serde_traitobject (nightly features) solves this by encoding the struct and trait type id into the serialized data, which offers the most flexibility since explicit type registration (which has its own downsides) is not necessary.

It's currently impossible (for good reason) to have typetag register generic types and traits (associated as well), but a workaround can be achieved by setting up a custom macro with which we would annotate a DoFn or Coder struct to expand the UntypedDoFn implementation as entirely non-generic, like:

impl UntypedDoFn for MyDoFn {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
    Box::new(<Self as DoFn>::process_element(self, element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
  }
}

One topic which requires some care and attention is utility DoFns and the desire to use modern patterns for simple Filter/FlatMap/Map transforms. Something like collection.map(|element: &String| -> String { element.to_uppercase() }) becomes impossible to write, because the closure type can't be represented without using generics. A clever way to hide this could be to write proc macros which unpack a closure expression's body and rewrite it as a struct with the necessary trait implementations, such that this can be expressed as collection.apply(Map!(|element: &String| -> String { string.to_uppercase() })). Utility functions like fn<I, O, F: Fn(&I) -> O> map(f: F) could potentially be shipped as part of a trait in a separate crate which adds those utility functions to PCollection with serialization handled by e.g. serde_traitobject. This would enable us to ship Beam's core as a crate which only uses stable features, but allows users to explicitly opt into developer ergonomics with unstable Rust features.

To complete this issue we simply need to agree if we'd like to build a stable only SDK with nightly features in separate crates or if we embrace nightly features as part of the core SDK to provide more ergonomic ways of expressing ourselves.

Issue Priority

Priority: 3 (nice-to-have improvement)

Issue Components

sjvanrossum commented 1 year ago

I haven't fully evaluated rkyv yet, but its rkyv_dyn crate (also uses inventory for the type registry) seems promising since it does allow registration of generic implementations of non-generic traits, e.g.:

struct MyGenericDoFn<I> {
  marker: PhantomData<I>,
}

impl<I> TypedDoFn for MyGenericDoFn<I> {
  type I = String;
  type O = String;

  fn process_element(&mut self, element: &Self::I) -> Self::O {
    element.to_uppercase()
  }
}

impl<I> UntypedDoFn for MyGenericDoFn<I> {
  fn process(&mut self, element: Box<dyn Any>) -> Box<dyn Any> {
    Box::new(self.process_element(element.downcast_ref::<<Self as DoFn>::Input>().unwrap()))
  }
}

The only requirement is that users register them with concrete types in the hopefully few cases where that'd be required. Again, perhaps some of that can also be hidden behind a macro.