spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 514 forks source link

new StateBaseAsyncDoFn with State and Timers #5055

Open albertols opened 10 months ago

albertols commented 10 months ago

Feature Request, motivated by BaseAsyncDoFn and KV lookups to avoid duplicates (instead of using Redis, BigTable, Hazelcast IMDG, etc) https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java

  1. By using State and Timers we could prevent processElement from being called: https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java#L78
  2. Abstracting https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java#L94C1-L95C1

protected abstract boolean alreadySent(@StateId("buffer") MapState<InputT, OutputT> buffer, InputT element);

and this should be also abstract: https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java#L166

  1. and keeping the State somehow through:

protected abstract void addIdempotentElementInBuffer(MapState<InputT, OutputT> buffer, InputT input, OutputT output);

NOTE: GlobalWindow is needed (before applying applyTransform(ParDo.of(new StateAsyncParDoWithAkka(mediationConfig))).map { m =>), in order to keepthe state amomng processElements

Happy to hear other alternatives, we are trying to cut IMDG and extenral idempotent lookups? it would be really cool 👍🏼

Thanks SCio Team!!

kellen commented 10 months ago

IMDG = ?

BaseAsyncLookupDoFn has a type parameter for some cache implementation, would that suffice?

albertols commented 10 months ago

IMDG = ?

BaseAsyncLookupDoFn has a type parameter for some cache implementation, would that suffice?

Thanks @kellen, I was checking it out too ;) , really interesting indeed, what I meant by IMDG, it is that we are using a Hazelcast cluster on GKE (with some mircoservices), and now prototyping in SCIO a similar data processing app.

I am going through https://github.com/spotify/scio/blob/0eece133a773e7ff85fc9e7fdcad1bfc3593fc1d/scio-test/src/test/scala/com/spotify/scio/transforms/AsyncLookupDoFnTest.scala#L273C33-L273C46, based on BaseAsyncLookupDoFn

I am trying to figure out an implementation: I do not see many examples but https://github.com/spotify/scio/blob/0eece133a773e7ff85fc9e7fdcad1bfc3593fc1d/scio-test/src/test/scala/com/spotify/scio/testing/JobTestTest.scala#L1122, the only concern here would be the sacalabilty, I guess the DataFlow Vertical autoscaling might help.

I was also wondering TTL for each Element, (there would be some uses cases when eviction comes about could be released, e.g: PubSub; with Timers and @OnTime we could have some interesting control / output for each Element.

Cheers

kellen commented 10 months ago

I still don't think I know what "IMDG" means here.

You are correct that BaseAsyncLookupDoFn could use a better example. You can plug in whatever cache supplier you want, e.g. a com.google.common.cache .Cache, and have that handle TTL etc

RustedBones commented 10 months ago

IMDG stands for In-Memory Data Grid

albertols commented 9 months ago

Hi Guys!

I was able to have a functional State & Timer, for duplicates control; we could have "in memory/or with a low latency" duplicate checks of +1M events (just when using GlobalWindows for PubSub events, otherwise I have not been able to keep state among calls with FixedWindows and no Triggers):

  1. Call: https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/SCIOAsyncService.scala#L65
  2. Proposal of StateBaseAsyncDoFn https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/state/StateBaseAsyncDoFn.java
  3. implementaiton of abstract methods https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/http/StateAsyncParDoWithAkka.scala

thus, we would have not to deal with a cache supplier (+1M it could lead to memory issues) and it could be easier to scale (Vertical autoscale it is only available in DataFlow Prime)

Thanks for reviewing!

P.S.1: OK duplicates flow image

P.S.2: I am curently testing in DataFlow with multiple workers

P.S.3: I am facing some race conditions when records are mocked at the same time (e.g: scio-8000000246 and beam-8000000246 ):

image

it could be elminated with distinct here distinctBykeyUrl but Trigger must be applied: https://github.com/albertols/scio-db/blob/develop/src/main/scala/com/db/myproject/async/SCIOAsyncService.scala#L30

alberto-lopez-db commented 4 months ago

regarding this potential enhancement S & T for BaseAsyncDoFn, this has been published last week https://medium.com/@serna.alberto.eng/avoid-http-requests-duplicates-in-apache-beam-with-scio-a-custom-baseasyncdofn-and-state-and-2c7d63059ab3

hope it helps @kellen , @RustedBones