apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.21k forks source link

[Feature Request][Go SDK]: Enable emitting to windowed PCollection from FinishBundle() #23963

Open camphillips22 opened 1 year ago

camphillips22 commented 1 year ago

What would you like to happen?

I originally asked this question on StackOverflow, but it does appear that there's a gap between python/java functionality and Go. According to this documentation, if emitting data into a PCollection from finish_bundle(), you must use a WindowedValue.

This concept doesn't exist for the Go SDK and any data that's emitted from FinishBundle() is emitted into the SingleGlobalWindow. I see a TODO comment in the StartBundle() method that refers to a resolved Jira Issue BEAM-3303, but I don't see any existing Github Issues that address this gap.

The below isn't a functioning example, but is essentially what I would like to be able to do in a streaming pipeline.

func BatchRpcFn {
  client RpcClient
  bufferRequest *RpcRequest
}

func (f *BatchRpcFn) Setup(ctx context.Context) {
  // setup client
}

func (f *BatchRpcFn) ProcessBundle(ctx context.Context, id string, emit func(string, bool)) error {
  f.bufferRequest.Ids = append(f.bufferRequest.Ids, id)
  if len(f.bufferRequest.Ids) > bufferLimit {
    return f.performRequestAndEmit(ctx, emit)
  }
  return nil
}

func (f *BatchRpcFn) FinishBundle(ctx context.Context, emit func(string, bool)) error {
  return f.performRequestAndEmit(ctx, emit)
}

Issue Priority

Priority: 2

Issue Component

Component: sdk-go

lostluck commented 1 year ago

This is an unfortunate oversight.

Right now the current SDK behavior is incorrect (a consequence of being heavily tested under Global Window batch, and this behavior added pre robust windowing). Basically what's going wrong is that we don't know a good event time for the subsequently emitted batch. In general, we can't* from a framework perspective, as it's user data specific.

The work around would be to use emitters of the style func(beam.EventTime,V) or func(beam.EventTime, K, V) in both ProcessElement and FinishBundle while also requesting the existing element's beam.EventTime. This, in combination with storing a reasonable event time for the batch, would be a workaround to the windowing issue. The windowing would also need to be downstream of this DoFn to ensure windows.


So the SDK would need to prevent this bad behavior going forward (failing on strict modes for it, providing a descriptive warning and instructions for fixing it as well). The SDK analysis portion would need to allow a slight mismatch in emitters, so the StartBundle/FinishBundle emitters are required to have an explicit event time, and not have to match the ProcessElement emitter exactly. This forces setting explicit event times, allowing for the correct windowing behavior.

The event time thing is the big sticking point since if the elements don't have an event time, that means any subsequent windowing will have issues as well. So to permit any existing broken code to continue to function, it would need to analyze if there's a down stream issue,

Having a robust GroupIntoBatches transform would be able to avoid this (have the GIB transform do the grouping, and handling the event time management), since it would then handle this logic for you. This is already on my "sooner", rather than "later" list. (I can't be more precise than that).


*Technically CombineFn's have the framework make a specific choice for it's equivalent behavior here (what is a Lifted CombineFn, but a DoFn with fancy batching after all?), but we simply use the most default strategy for beam.

lostluck commented 1 year ago

Intentionally lableing this with bug and new feature for now, because the SDK can't do this, and what it has is entirely incorrect, but it's not simply a 'fix.