apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.84k stars 4.24k forks source link

[Feature Request][Go SDK]: Support Slowly Changing Side Input Pattern #23106

Open lostluck opened 2 years ago

lostluck commented 2 years ago

What would you like to happen?

Requires implementing a Periodic Impulse and Periodic Sequence transform for occasional re-reads of some data, in a large window.

Requires using those re-reads as a side input.

Requires allowing the map_windows urn: https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L296

So that the SDK can map windows from one to another, this needs to be added to the big switch in exec/translate.go.

Currently, trying to manually use this pattern leads to a panic on execution when a runner makes the mapping request:

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L734

Issue Priority

Priority: 1

Issue Component

Component: sdk-go

manuzhang commented 2 years ago

set to p2 since this is a feature request

camphillips22 commented 1 year ago

How hard would it be to just add the map_windows urn support? I wrote a pipeline with a side input that isn't exactly the "slow changing side input" pattern, but the pipeline failed because map_windows isn't implemented.

For context, the pipeline was shaped approximately like:

flowchart TD

A[WindowInto] --> |"KV#lt;string, data#gt;"| B["Distinct(DropValue(col))"];
B --> C[Lookup configuration for keys];
C --> D["Map to KV#lt;string, config#gt;"];
D --> E
A --> |"KV#lt;string, data#gt;"| E["Emit new data based on config<br>ParDo(col, beam.SideInput(KV#lt;string, config#gt;)"];
lostluck commented 1 year ago

OK, I love that we can do charts in the markdown of an issue. Spectacular!

It's a matter of implementing the URN in the switch statement in exec/translate.go:, decoding the appropriate payload proto, and using that information to configure an exec node that exists on the exec side of the pipeline, in this case, taking an timestamp, and producing the appropriate window for it, based on the provided windowing strategy (I assume). Specific payload (likely a specific proto message) should be listed in the proto next to the URN enum in beam_fn_api.proto.

It's not particularly difficult, just a small pain to test, since I don't know how well the various semantics are implemented for portable use, outside of Dataflow. Flink should be able to do it, and I recommend that for local iteration. Portable Python has issues with streaming semantics. Go Direct runner, is right out at present (and is in the process of being replaced with a portable implementation because Direct Runners have problems.)

So, my recommendation would be to add an integration test to the primitives package ( https://github.com/apache/beam/blob/master/sdks/go/test/integration/primitives/windowinto.go). The simple pipeline you have there is a great base and the implementer can follow the patterns with the existing integration tests, assuming it can be done in a batch mode. But then it can be run against a few of the runners at once and then the changes implement the changes in the SDK side. The test can then be filtered out from non-compliant runners in https://github.com/apache/beam/blob/master/sdks/go/test/integration/integration.go#L63, as determined by "what fails after it's working well on Dataflow and Flink".

That slows down iteration time a bit (outside of simple unit testing), but ends up with something that will be tested and work on all the runners, without spending money on Dataflow, which shouldn't be a pre-req of Beam Development....

On Wed, 16 Nov 2022 at 12:30, camphillips22 @.***> wrote:

How hard would it be to just add the map_windows urn support? I wrote a pipeline with a side input that isn't exactly the "slow changing side input" pattern, but the pipeline failed because map_windows isn't implemented.

For context, the pipeline was shaped approximately like:

flowchart TD

A[WindowInto] --> |"KV#lt;string, data#gt;"| B["Distinct(DropValue(col))"]; B --> C[Lookup configuration for keys]; C --> D["Map to KV#lt;string, config#gt;"]; D --> E A --> |"KV#lt;string, data#gt;"| E["Emit new data based on config
ParDo(col, beam.SideInput(KV#lt;string, config#gt;)"];

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/23106#issuecomment-1317629945, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADKDOFKGXFX7TIWZ2QUD25DWIU76PANCNFSM6AAAAAAQIAHRAE . You are receiving this because you authored the thread.Message ID: @.***>

camphillips22 commented 1 year ago

Thanks for the feedback!

decoding the appropriate payload proto, and using that information to configure an exec node that exists on the exec side of the pipeline, in this case, taking an timestamp, and producing the appropriate window for it, based on the provided windowing strategy (I assume). Specific payload (likely a specific proto message) should be listed in the proto next to the URN enum in beam_fn_api.proto.

I actually got this far myself, but I'm having trouble understanding how to write the node's ProcessElement. It's not clear to me what the shape of the data is here and there's not another node that's similar, so it's taking some time to understand how to write that bit. From my experimentation, the FullValue is empty, so I'm assuming that I need create the coders and evaluate the ReStream, but haven't gotten to that yet.

As far as integration tests, I also took the approach you suggested with a pipeline that looks like:

func MapWindows(s beam.Scope) {
    col := beam.ParDo(s, &makeTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s))
    windowed := beam.WindowInto(s, window.NewFixedWindows(3*time.Second), col)
    mean := stats.Mean(s, windowed)
    filtered := beam.ParDo(s, filterAbove, windowed, beam.SideInput{Input: mean})
    globalFiltered := beam.WindowInto(s, window.NewGlobalWindows(), filtered)
    passert.Sum(s, globalFiltered, "a", 4, 30)
}

The problem with this was that it never hit the map_windows URN. Not sure if there's some difference between streaming and batch pipelines here though. I wasn't sure how to test a streaming pipeline in a test like this (well, presumably the teststream package, but that's not supported by dataflow), so I ended up slightly modifying the streaming wordcap example to test it on dataflow. Not spending money though would be great, so I'll try to get local flink set up.

lostluck commented 1 year ago

That's what I was afraid of :/. We don't have much in the way of local streaming test infra, so flink and Dataflow are the only places to run things for now. (The problem here is turning down the pipeline at the end. Beam doesn't support "terminating" a streaming pipeline from the inside so it needs to happen outside.)

I'll look this up later, but for map windows there should be no "values" it should basically be just the EventTime timestamp, and the output would be the IntervalWindow for the configured windowing strategy. The question is if that's moving as raw data or as a windowed value header with a nonce value...

On Thu, Nov 17, 2022, 6:22 AM camphillips22 @.***> wrote:

Thanks for the feedback!

decoding the appropriate payload proto, and using that information to configure an exec node that exists on the exec side of the pipeline, in this case, taking an timestamp, and producing the appropriate window for it, based on the provided windowing strategy (I assume). Specific payload (likely a specific proto message) should be listed in the proto next to the URN enum in beam_fn_api.proto.

I actually got this far myself, but I'm having trouble understanding how to write the node's ProcessElement. It's not clear to me what the shape of the data is here and there's not another node that's similar, so it's taking some time to understand how to write that bit. From my experimentation, the FullValue is empty, so I'm assuming that I need create the coders and evaluate the ReStream, but haven't gotten to that yet.

As far as integration tests, I also took the approach you suggested with a pipeline that looks like:

func MapWindows(s beam.Scope) { col := beam.ParDo(s, &makeTimestampedData{Data: []int{4, 9, 2, 3, 5, 7, 8, 1, 6}}, beam.Impulse(s)) windowed := beam.WindowInto(s, window.NewFixedWindows(3*time.Second), col) mean := stats.Mean(s, windowed) filtered := beam.ParDo(s, filterAbove, windowed, beam.SideInput{Input: mean}) globalFiltered := beam.WindowInto(s, window.NewGlobalWindows(), filtered) passert.Sum(s, globalFiltered, "a", 4, 30) }

The problem with this was that it never hit the map_windows URN. Not sure if there's some difference between streaming and batch pipelines here though. I wasn't sure how to test a streaming pipeline in a test like this (well, presumably the teststream package, but that's not supported by dataflow), so I ended up slightly modifying the streaming wordcap example to test it on dataflow. Not spending money though would be great, so I'll try to get local flink set up.

— Reply to this email directly, view it on GitHub https://github.com/apache/beam/issues/23106#issuecomment-1318712021, or unsubscribe https://github.com/notifications/unsubscribe-auth/ADKDOFPV42DRT5DZCHASFVDWIY5RLANCNFSM6AAAAAAQIAHRAE . You are receiving this because you authored the thread.Message ID: @.***>

lostluck commented 1 year ago

I've been told that one needs to start the local Flink runner in "checkpointing" mode in order to get the desired behavior out of it for streaming things. I don't know how to do that off hand, but I'd look into how the Python Streaming Load Tests set up their flink instances.

I have also confirmed that a Streaming Splittable DoFn (one that returns ProcessContinuations) should be gracefully ending a streaming pipeline when all instances have returned a Stop, so we may have a tractable streaming testing scenario that doesn't require a forced cancellation.

I don't love that we need to have a different configuration for Flink to behave properly in streaming contexts, but that's a different problem.

hnnsgstfssn commented 1 year ago

I've added a draft of the periodic.Sequence and periodic.Impulse but I'm having some issues and need some guidance around testing.

25808 includes an example that I'm using to run this on Dataflow.

The pipeline runs and seems to correctly use the side input lookup as it receives input, however when draining it spits out the following trace.

Error message from worker: generic::unknown: process bundle failed for instruction process_bundle-5-4 using plan drain-S02-11 : panic: runtime error: index out of range [2] with length 2 Full error: while executing Process for Plan[drain-S02-11]: 2: DataSink[S[ptransform-9@localhost:12371]] Coder:W;coder-50>!IWC 3: PCollection[pcollection-32] Out:[2] 4: WindowInto[FIX[1m0s]]. Out:2 5: PCollection[pcollection-26] Out:[4] 6: ParDo[main.update] Out:[5] Sig: func(context.Context, mtime.Time, int64, func(int, string)) 7: PCollection[pcollection-22] Out:[6] 8: SDF.ProcessSizedElementsAndRestrictions[periodic.sequenceGenDoFn] UID:8 Out:[7] 9: PCollection[pcollection-12-truncate-output] Out:[8] 10: SDF.TruncateSizedRestriction[periodic.sequenceGenDoFn] UID:10 Out:[9] 1: DataSource[S[ptransform-8@localhost:12371], 0] Out:10 Coder:W;coder-38,KV;coder-42>,double;coder-45>>!GWC caused by: panic: runtime error: index out of range [2] with length 2 goroutine 44 [running]: runtime/debug.Stack() /usr/lib/go/src/runtime/debug/stack.go:24 +0x65 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic.func1() /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:58 +0xa5 panic({0xfb70a0, 0xc000137f08}) /usr/lib/go/src/runtime/panic.go:884 +0x213 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*trInvoker).Invoke(0xc0001db0a0?, {0x11f5450?, 0xc00026b800?}, {0xf919c0?, 0xc000011728?}, 0xc0001db0a0?) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf_invokers.go:320 +0x1b3 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*TruncateSizedRestriction).ProcessElement(0xc00026b7c0, {0x11f5450, 0xc00026b800}, 0xc0001dae00, {0x0, 0x0, 0x0}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/sdf.go:345 +0x118 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*DataSource).Process(0xc000496c80, {0x11f5450, 0xc00026b800}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/datasource.go:189 +0x510 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute.func2({0x11f5450?, 0xc00026b800?}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:131 +0x42 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.callNoPanic({0x11f5450?, 0xc00026b800?}, 0x0?) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/util.go:62 +0x6c github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec.(*Plan).Execute(0xc0000d3c20, {0x11f5450, 0xc00026b800}, {0xc000137ae8, 0x12}, {{0x11ed500?, 0xc0002a0ae0?}, {0x120a9f8?, 0xc0002a0b10?}}) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/exec/plan.go:130 +0x3da github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.(*control).handleInstruction(0xc0004fc000, {0x11f5338, 0xc0004d91a0}, 0xc00009a3c0) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:407 +0xab7 github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main.func4({0x11f5338, 0xc0004d91a0}, 0xc00009a3c0) /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:193 +0x19d created by github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness.Main /home/rru/temp/beam/sdks/go/pkg/beam/core/runtime/harness/harness.go:212 +0xfed

I'm now beyond my understanding of the SDK, but it seems it fails here somehow thinking that it's a KV pair that's coming in (?), judging from the caller here.

I'm not sure what's going on and would appreciate some guidance on addressing this. Perhaps I'm just missing something simple in the implementation?

hnnsgstfssn commented 1 year ago

Actually, I've run the failing pipeline off Beam 2.47.0.dev, and now testing it on Beam 2.45.0 it drains successfully. Draining on 2.46.0 fails. Could it be a regression?