hazelcast / hazelcast-jet

Distributed Stream and Batch Processing
https://jet-start.sh
Other
1.1k stars 205 forks source link

Implement a beam runner #437

Closed rmannibucau closed 5 years ago

rmannibucau commented 7 years ago

Beam (https://beam.apache.org/) provides a unified API for batch and streaming processing. It can be seen as a generic/portable API with multiple implementations (Spark, Flink, ...). Jet is pretty close to the these concepts and it would awesome to be able to run a beam batch/stream on top of hazelcast.

Work is to write a runner. Using Spark one (or direct one) as a basis is probably the best (https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java).

I think jet should be configurable through beam options in client mode to make more sense but an embedded mode would also be awesome since it would allow testing and debugging (surely another options to add in the runner).

The work is mainly about converting beam model to jet one.

rmannibucau commented 7 years ago

created a skeleton at https://github.com/rmannibucau/beam-hazelcast-runner hoping it helps a bit, the visitor/translator needs to be implemented to convert beam logic into jet one which is the core of the runner impl. I'm not yet sure the best way to align the models in term of sources (do we put all data in jet then execute or do we run executions owning data).

vladoschreiner commented 7 years ago

Hi Romain,

We experimented with Beam already to introduce it as a high-level API for Jet. The idea of Beam is interesting, however we've parked our efforts until Beam becomes more mature.

Nevertheless, if you decide to contribute we'll be happy to support you.

Vladimir

rmannibucau commented 7 years ago

Beam 2.0.0 got released very recently and brings api stability so guess it is mature now.

edit: updated a bit the structure of the visitor (to at least list the cases to handle and fix the test), some help to integrate "right" with jet would be welcomed (like how to handle windowing, grouping etc... right)

First iteration targets batchs for now but then we'll need to add streaming (yeah! :))

rmannibucau commented 7 years ago

Up

Now jet has windoing too it fits even more beam. Any hope to get help on the interpreter impl?

vladoschreiner commented 7 years ago

Hi Romain, we don't plan to integrate Jet with Beam API in a next release. Nevertheless, if you like to contribute with it we are happy to support you with consultations.

rmannibucau commented 7 years ago

Hi Vladimir,

some pointers for

  1. the processors conversion/mapping (https://github.com/rmannibucau/beam-hazelcast-runner/blob/master/src/main/java/com/github/rmannibucau/beam/runner/hazelcast/HazelcastPipelineVisitor.java#L78) would be great
  2. maybe some idea on how to map PCollections (likely to IStream)

can be welcomed yes

mtopolnik commented 7 years ago

Hi Romain,

we see one quite severe issue with Beam's model. It assumes a fully general, opaque window assignment policy:

  /**
   * Given a timestamp and element, returns the set of windows into which it
   * should be placed.
   */
  public abstract Collection<W> assignWindows(AssignContext c) throws Exception;

This conflicts with Jet's approach to the optimization of windowed aggregations, where we leverage the knowledge on the kind of window being constructed to prevent the buffering of all the data items, avoid duplicated computation of the segments of a sliding window, perform efficient combining in a two-stage aggregation, etc.

We can of course write new processors which respect this logic, but with crippling effects on performance.

rmannibucau commented 7 years ago

Hmm, this is a great feedback Marko. Do you care sending it to the beam list? If none of us miss a way to bypass it it looks like a very critical issue (to not say blocking at some point) in beam and beam should be adapted to solve that issue. I keep thinking hazelcast model is smooth and nice and beam can benefit from exchanging with jet in term of design to becomes the portable API it targets.

Do you mind send this mail - with maybe other potential blockers?

mtopolnik commented 7 years ago

This is just one of the issues... another one is the assumption of a dedicated thread running a vertex, clashing with our cooperative multithreading model. We use lazy sequences everywhere to be able to suspend and resume their processing; they prefer eagerly materialized collections, which also imply larger memory requirements.

Our current thinking is that over the next development cycle we'll try to come up with our own higher-level API and doing so we'll gain more insight into what kind of API idioms make sense for us.

rmannibucau commented 7 years ago

Yep, saw this one trying to implement beam model.

Still think it is important to get in touch with beam now cause otherwise it would never converge which would be negative for end users (the promish of beam is great for enterprises)

cangencer commented 5 years ago

@rmannibucau This is in progress: https://github.com/hazelcast/hazelcast-jet-beam-runner

cangencer commented 5 years ago

Jet runner has now been merged: https://beam.apache.org/documentation/runners/jet/