apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

[Feature Request][Go SDK]: Generics API for beam #21929

Open gonzojive opened 2 years ago

gonzojive commented 2 years ago

What would you like to happen?

Generics make beam code more readable.

Type registration is somewhat annoying because multiple type registrations are required for a single generic DoFn[T, U, ...], one for each concrete type required for a pipeline.

Here's a little library I wrote to play around with this. It works but doesn't cover the full API and isn't super thoughtful/consistent:

// Package beamgen contains utilites for working with beam pipelines using
// generics.
package beamgen

import (
    "context"
    "reflect"

    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/graphx/schema"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
    "github.com/samber/lo"
)

// Collection is a typed PCollection.
type Collection[T any] struct {
    underlying beam.PCollection
}

// PCollection returns a generic PCollection from a typed PCollection.
func (c Collection[T]) PCollection() beam.PCollection { return c.underlying }

// AssertType returns a Collection[T] from a PCollection that asserts it has a
// given type.
func AssertType[T any](coll beam.PCollection) Collection[T] {
    return Collection[T]{coll}
}

// Create returns a PCollection from a set of in-memory objects.
//
// Create inserts a fixed non-empty set of values into the pipeline. The values must
// be of the same type 'A' and the returned PCollection is of type A.
//
// The returned PCollections can be used as any other PCollections. The values
// are JSON-coded. Each runner may place limits on the sizes of the values and
// Create should generally only be used for small collections.
func Create[T any](scope beam.Scope, obj ...T) Collection[T] {
    asInterfaces := lo.Map(obj, func(elem T, _ int) any { return elem })
    untyped := beam.Create(scope, asInterfaces...)
    return Collection[T]{untyped}
}

//func ParDoFunc[InT, OutT any](scope beam.Scope, in Collection[InT], fn func(value InT))

// ParDoUnsafe is like beam.ParDo in that it accepts an `any` dofn, but the
// input and output collections are typed.
func ParDoUnsafe[InT, OutT any](scope beam.Scope, dofn any, inCol Collection[InT], opts ...beam.Option) Collection[OutT] {
    return Collection[OutT]{
        beam.ParDo(scope, dofn, inCol.PCollection(), opts...),
    }
}

type DoFnInterfaceStruct1[InT, OutT1 any] interface {
    ProcessElement(ctx context.Context, value InT, emit func(out OutT1)) error
}

// ParDo1 is like beam.ParDo in that it accepts an `any` dofn, but the
// input and output collections are typed.
func ParDo1[InT, OutT any](scope beam.Scope, dofn DoFnInterfaceStruct1[InT, OutT], inCol Collection[InT], opts ...beam.Option) Collection[OutT] {
    return Collection[OutT]{
        beam.ParDo(scope, dofn, inCol.PCollection(), opts...),
    }
}

type DoFnInterfaceStruct2[InT, OutT1, OutT2 any] interface {
    ProcessElement(ctx context.Context, value InT, emit1 func(OutT1), emit2 func(OutT2)) error
}

// ParDo2 is like beam.ParDo in that it accepts an `any` dofn, but the
// input and output collections are typed.
func ParDo2[InT, OutT1, OutT2 any](scope beam.Scope, dofn DoFnInterfaceStruct2[InT, OutT1, OutT2], inCol Collection[InT], opts ...beam.Option) (Collection[OutT1], Collection[OutT2]) {
    c1, c2 := beam.ParDo2(scope, dofn, inCol.PCollection(), opts...)
    return Collection[OutT1]{c1}, Collection[OutT2]{c2}
}

// ParDo1Func is like beam.ParDo in that it accepts an `any` dofn, but the
// input and output collections are typed.
func ParDo1Func[InT, OutT any](scope beam.Scope, dofn func(ctx context.Context, in InT, emit OutT) error, inCol Collection[InT], opts ...beam.Option) Collection[OutT] {
    return Collection[OutT]{
        beam.ParDo(scope, dofn, inCol.PCollection(), opts...),
    }
}

// ParDoGBKFunc is used to handle the result of a call to GroupByKey.
func ParDoGBKFunc[InK, InV, OutT any](
    scope beam.Scope,
    dofn func(ctx context.Context, in InK, next func(*InV) bool, emit func(OutT)) error,
    inCol Collection[GroupedByKey[InK, InV]],
    opts ...beam.Option) Collection[OutT] {
    return Collection[OutT]{
        beam.ParDo(scope, dofn, inCol.PCollection(), opts...),
    }
}

// ParDoGBK is like beam.ParDo in that it accepts an `any` dofn, but the
// input and output collections are typed.
func ParDoGBK[InK, InV, OutT any](
    scope beam.Scope,
    dofn interface {
        ProcessElement(ctx context.Context, in InK, next func(*InV) bool, emit func(OutT)) error
    },
    inCol Collection[GroupedByKey[InK, InV]],
    opts ...beam.Option) Collection[OutT] {
    return Collection[OutT]{
        beam.ParDo(scope, dofn, inCol.PCollection(), opts...),
    }
}

// ParDoGBK0 is like beam.ParDo in that it accepts an `any` dofn, but the
// input and output collections are typed.
func ParDoGBK0[InK, InV any](
    scope beam.Scope,
    dofn DoFnInterfaceGBK0[InK, InV],
    inCol Collection[GroupedByKey[InK, InV]],
    opts ...beam.Option) {
    beam.ParDo0(scope, dofn, inCol.PCollection(), opts...)
}

type DoFnInterfaceGBK0[InK, InV any] interface {
    ProcessElement(ctx context.Context, in InK, next func(*InV) bool) error
}

// DoFnInterfaceKVStruct is a DoFn that outputs a PCollection<KV<OutK, OutV>> from a PCollection<InT>
type DoFnInterfaceKVStruct[InT, OutK, OutV any] interface {
    ProcessElement(ctx context.Context, value InT, emit func(key OutK, value OutV)) error
}

// ParDoKV is a key/value version of ParDo
func ParDoKV[InT, OutK, OutV any](scope beam.Scope, dofn DoFnInterfaceKVStruct[InT, OutK, OutV], inCol Collection[InT], opts ...beam.Option) Collection[KV[OutK, OutV]] {
    return Collection[KV[OutK, OutV]]{
        beam.ParDo(scope, dofn, inCol.PCollection(), opts...),
    }
}

// A type for holding a key and value, mostly used only as a type.
type KV[K, V any] struct {
    // Key   K `json:"key"`
    // Value V `json:"value"`
}

type GroupedByKey[K, V any] struct{}

// GroupByKey is a PTransform that takes a PCollection of type KV<A,B>,
// groups the values by key and windows, and returns a PCollection of type
// GBK<A,B> representing a map from each distinct key and window of the
// input PCollection to an iterable over all the values associated with
// that key in the input per window. Each key in the output PCollection is
// unique within each window.
func GroupByKey[K, V any](scope beam.Scope, col Collection[KV[K, V]]) Collection[GroupedByKey[K, V]] {
    return Collection[GroupedByKey[K, V]]{
        beam.GroupByKey(scope, col.underlying),
    }
}

// type KeyExtractor[V, K any] struct {
//  fn func
// }

// func KeyBySimple[K, V](scope beam.Scope, col Collection[V], keyFn func(elem V) K) Collection

// RemoveDuplicates takes a PCollection with duplicate entries and returns a
// PCollection with the duplicates removed.
func RemoveDuplicates[T any](scope beam.Scope, col Collection[T]) Collection[T] {
    scope = scope.Scope("RemoveDuplcates")
    var table Collection[KV[T, T]] = ParDoKV[T, T, T](scope, &xToKVXXFn[T]{}, col)
    var grouped Collection[GroupedByKey[T, T]] = GroupByKey(scope, table)
    return ParDoGBK[T, T, T](scope, &keysOfGBKFn[T, T]{}, grouped)
}

func Reshuffle[T any](scope beam.Scope, col Collection[T]) Collection[T] {
    return Collection[T](AssertType[T](beam.Reshuffle(scope, col.PCollection())))
}

func RemoveDuplicatesInit[T any]() {
    runtime.RegisterType(reflect.TypeOf((*keysOfGBKFn[string, string])(nil)).Elem())
    schema.RegisterType(reflect.TypeOf((*keysOfGBKFn[string, string])(nil)).Elem())
    runtime.RegisterType(reflect.TypeOf((*xToKVXXFn[string])(nil)).Elem())
    schema.RegisterType(reflect.TypeOf((*xToKVXXFn[string])(nil)).Elem())
}

type xToKVXXFn[T any] struct{}

func (f *xToKVXXFn[T]) ProcessElement(_ context.Context, in T, emit func(T, T)) error {
    emit(in, in)
    return nil
}

type keysOfGBKFn[K, V any] struct{}

func (f *keysOfGBKFn[K, V]) ProcessElement(_ context.Context, key K, nextValue func(*V) bool, emit func(K)) error {
    emit(key)
    return nil
}

func keysOfGBKFnOld[K, V any](_ context.Context, key K, nextValue func(*V) bool, emit func(K)) error {
    emit(key)
    return nil
}

func TextIOWrite(scope beam.Scope, filename string, col Collection[string]) {
    textio.Write(scope, filename, col.PCollection())
}

// IterToSlice returns a slice from a beam iterator.
func IterToSlice[T any](next func(*T) bool) []T {
    var out []T
    for {
        var value T
        if !next(&value) {
            break
        }
        out = append(out, value)
    }
    return out
}

// IterForEachErr returns a slice from a beam iterator.
func IterForEachErr[T any](next func(*T) bool, fn func(t T) error) error {
    for {
        var value T
        if !next(&value) {
            return nil
        }
        if err := fn(value); err != nil {
            return err
        }
    }
}

// AddFixedKey adds a fixed key (0) to every element.
func AddFixedKey[T any](scope beam.Scope, col Collection[T]) Collection[KV[int, T]] {
    return Collection[KV[int, T]]{
        beam.AddFixedKey(scope, col.PCollection()),
    }
}

Issue Priority

Priority: 2

Issue Component

Component: sdk-go

damccorm commented 2 years ago

Hey @gonzojive thanks for the suggestion - broader use of generics is absolutely something we want to see more of!

This doesn't do everything you're talking about here, but I did add generic registration functions which will be part of the next release (2.40). The docs aren't merged yet (here if you want a sneak peak), but it should allow you to register your DoFn, register all associated types, and do some performance optimization using reflection with a single call.

This is just a start, and we should absolutely keep pushing usage of generics forward to improve the experience here.

lostluck commented 2 years ago

I like the general thrust of this suggestion! There's plenty to do to use generics with beam (see Beam Java's probable over-abundance of Generics). Currently we have the goal to carefully add generics without breaking existing code.

First as Danny points out is the generic registrations, which make it easier to improve performance by removing per-call execution time reflection.

We'd also like to introduce a "real" KV type, and make sure that's understood by the whole framework. No more splitting KVs (except to support older code). Similarly, generic iterator and emitters types would let the Go compiler do more work for inlining and efficient calls. While we're probably going to need to design our own "emitter" type or interface, it feels inevitable that Iterators/Iterables will have an official Go solution by Go 1.20, so we'd want to wait for that to become locked down instead of inventing our own interface/type.

Making the "front end" generic with typed PCollections is difficult once one gets to features like Side Inputs. I don't see fully compile time checked side inputs in the near term.

Either way, it's important to note that while the SDK's core will take time to include generic awareness, user DoFns and transforms will be able to take advantage of them much sooner.

All that said, something like the code in this example, could be added to a package under ...pkg/beam/x/.. so we can allow users to experiment with them before we fully commit to a given wrapper/approach.

gonzojive commented 2 years ago

Thanks for sharing your thinking.

One issue I punted on is the usage of any in T any below:

func Create[T any](scope beam.Scope, obj ...T) Collection[T] {
    asInterfaces := lo.Map(obj, func(elem T, _ int) any { return elem })
    untyped := beam.Create(scope, asInterfaces...)
    return Collection[T]{untyped}
}

It seems like the signature should be more like this to ensure things that can be encoded are being passed around:

func Create[T Serializable](scope beam.Scope, obj ...T) Collection[T] { /* ... */ }
lostluck commented 2 years ago

Agreed, but unfortunately "Serializable" is not a real concept WRT Go, let alone as generics.

As it stands, primitive types, and the Exported fields of structs (so, exactly like JSON's restrictions) are efficiently encoded using Beam schemas. Interfaces, Functions, Channels are not encodable because there's no good way of doing so, without the registration lookup tables (largely handled going forward with the register package Danny mentioned).

Anything is serializable with enough prepwork though, via lookup tables or enums, or pre-known keys. It's simply difficult to make things work on distributed workers with only runtime knowledge otherwise.

Aside from deferring to the standard proto.Marshaller for protocol buffer messages (determined via the proto interfaces), users can also register their own arbitrary coder functions (which also gets around the "exported fields" restrictions). You've already filed a task for us to improve the documentation for that escape hatch (#21930).

lostluck commented 1 year ago

Just a note WRT the register package. With Go 1.21, type inference is improved sufficiently that one no longer needs to explicitly list out ProcessElement types in the register.DoFnNxM calls. Registrations are still required, but that should make things simpler.

diamondburned commented 1 year ago

To slightly contribute to the wrapper library, this function adds a bit more runtime type safety for converting beam.PCollections:


// CollectionOf returns a typed PCollection converted from the given
// beam.PCollection. It exists for documenting purposes. If T doesn't match
// with what is actually in pc, the function panics.
func CollectionOf[T any](pc beam.PCollection) PCollection[T] {
    var z T
    if reflect.TypeOf(z) != pc.Type().Type() {
        panic(fmt.Sprintf(
            "CollectionOf given unexpected PCollection type: expected %v, got %v",
            reflect.TypeOf(z), pc.Type().Type()))
    }
    return PCollection[T](pc)
}
luillyfe commented 1 month ago

Any progress on this mattern? I would love to contribute!

lostluck commented 1 month ago

@luillyfe No, but also Yes!

No in the sense of attempting to do so within the current SDK framework. It either involves having to maintain separate but compatible layers, and mixing and matching with existing DoFns.

The best contribution would be to attempt to get a real generic KV in, as mentioned in https://github.com/apache/beam/issues/21929#issuecomment-1159121423. This would mean ensuring that

  1. We correctly interpret the KV as a KV when doing the manual pipeline checking in the typex/funcx/graph packages.
  2. Handling them as input parameters, and output parameters.
  3. Similarly can encode/decode these generic KVs as a user might expect.
  4. That the get populated and handled through the exec package, ideally fixing/handling the notion of a KV in the current exec.FullValue type.
  5. Handling within the existing
  6. Support it in transforms like beam.Create, beam.CoGBK etc, beam.CombinePerKey, and the passert package as one would expect.
  7. Make it interoperable with existing DoFns that do the "exploded" key, value pair.

If someone goes through all of that, which are table stakes for the feature, then we can get it in. Note, this doesn't have to be in one giant PR, we just can't have a beam.KV type exist in the beam user surface package without it being useful. We don't want to break people.


So that's the No case.

What I mean by Yes is that I've been exploring the idea of a Generic forward Beam Go SDK in my own personal repo. I gave a talk about it at Beam Summit this year, but the recording isn't up yet in the playlist (titled Beam SDKs don't have to look the same).

You can see a bit of it here: https://github.com/lostluck/experimental/blob/master/altbeams/allinone2/allinone2.go#L137

An older benchmark run: https://github.com/lostluck/experimental/blob/master/altbeams/allinone2/beam/beam_test.go#L129

It's not quite ready yet for others to contribute to, but maybe I can finally get it out and basically batch useful. The remaining things to do are a BlobIO/FileIO, a TextIO that uses that, windowing (though they are passed around properly), initial Documentation and examples, and of course moving it out of my experimental repo into it's own stand alone spot. But I do have SplittableDoFns working, and have a strong opinion on the affordances allowed.

I mention this here since handling the KVs as trickier than I expected, and influenced much of the coder/type handling. It's possible for us to integrate some of these ideas into the existing SDK, but it'll be tricky.

I didn't really set out to actually make a replacement SDK, but to explore ideas and see what would work, but having a low per-element overhead and zero allocations on the hot path is compelling by itself.