Open scwhittle opened 11 months ago
could introduce a way to track the frequency of use for each Bundle processor and only remove those that haven't been utilized for a longer period, rather than a strict one-minute rule.
What's the comparable lifetime in other contexts, e.g. Dataflow legacy worker streaming? Is there any eviction at all?
Dataflow streaming runner never times these out so that is certainly a valid option in my opinion.
Neither of which have a timeout.
We add all successfully processed dofns back to the queue: https://github.com/apache/beam/blob/add34385719e53e4cc728267bfd2d8e8730c7267/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java#L1161
Yea, I mean for today we do know that there's a finite number of them since the graph cannot change dynamically. I don't have historical context, nor have I looked at the commit history, so I don't know if there is a specific motivation here. Lacking that, I also would favor copying something that is known good. Aka let us just not time them out. Or we could at least set it to, say, an hour. I suppose you can close up and free bounded sources that have completed, if that is not done some other way.
What happened?
A timeout of 1 minute is specified here: https://github.com/apache/beam/blob/master/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java#L923
The key of this cache is the bundle descriptor id (fused stage) and the value is a list of BundleProcessors cached. This means that if we don't process a stage for a minute all of the bundle processors are destroyed. For long-lived streaming pipelines this is wasteful as stages are processed in parallel and if were previously processed, will likely process again. Creating a BundleProcessor involves constructing user DoFn and running Setup method so it is non-trivial.
Some improvements:
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components