bartosz25 / beam-learning

21 stars 5 forks source link

How can Apache Beam determine in code that the data in two PCollection are equal? #1

Open keeper-jie opened 2 years ago

keeper-jie commented 2 years ago

I want to use Apache Beam to implement K-means, but I can't tell whether the new and old PCollection are equal or not in the code, do you have a better answer or idea? Please check my StackOverflow question for details. https://stackoverflow.com/questions/71686576/how-to-compare-the-equality-of-elements-in-two-pcollection-in-apache-beam

In Spark, I can get the data by using .colletion, and I can share it globally by using broadcast variables, but in Apache Beam, PCollectionView can implement similar broadcast variables, but there is no way to get the data in PCollection or determine whether two PCollections are equal.

I tried to use Object.eqauls, but it failed. Also aggregating data into Sum.doublesGlobally() and converting it to PCollectionView, I found that I could only view it through the DoFn side input, then I modified the global variables through DoFn, and debugging found that the program would loop first and then enter the specific content of DoFn, so there was no way to determine convergence and then terminate the loop in the main program.

bartosz25 commented 2 years ago

Hi,

It may not be the best answer because I haven't needed to deal with collecting the results in Beam nor K-Means algorithm. But maybe, it'll give you some other solution.

  1. If I understood the problem correctly, you're generating only doubles in an array? If yes, you should add a "key" to the array, so that you can join relevant rows from both sides: [1.2, 3.4, 5.4] would become [(0, 1.2), (1, 3.4), (2, 5.4)].
  2. Do the join as suggested in the SO answer. It should be a full outer join.
  3. Filter the joined result. If any combined row has the left or right side empty, or the double value is different, it means the PCollections aren't the same.
  4. Generate a meaningful result from that (maybe {"is_equal": false}?) and write it to a sink.
  5. From another process of your application, listen for the sink events and trigger a relevant action when the result is generated.

I don't know any ways to "collect" the PCollection.

Best, Bartosz.

keeper-jie commented 2 years ago

Thank you for your reply.

I've tried two PCollection CoGoupByKey before, and if they are equal, they output the PCollection flag, but the problem is that I can't get the data in the PCollection in the main program, and if I write to a file as you said, it's too slow compared to the Spark implementation (because Spark can calculate based on memory and I have implemented K-means algorithm on Spark) I have tried the following.

1) Passing the end-of-loop flag via PipelineOption does not work. (https://beam.apache.org/documentation/programming-guide/#pipeline-options-cli)

2) SideInput can query the flags of whether two PCollection converge through PCollectionView, but it can't terminate the loop of the main program. Because the data in PCollectionView can also only be gotten by DoFn. (https://beam.apache.org/documentation/programming-guide/#side-inputs)

3) The ValueState of DoFn (https://beam.apache.org/documentation/programming-guide/#valuestate) can also be used only in DoFn, and cannot be applied to the main program loop.

4) The metrics are used to determine the information after the Pipeline execution, and cannot be used in the Pipeline loop.

5) Event time triggers seem to be used in streaming calculations, and what I am doing now is implementing the K-means algorithm to execute structured data, which means that the data are fixed. (https://beam.apache.org/documentation/programming-guide/#event-time-triggers)

I am confused is that because Apache Beam says it is a unified pipeline SDK for unified streaming data and batch processing, can replace Spark, the basic two PCollection equal are very difficult to judge, that for some algorithms of machine learning, the need to determine whether the results converge (the difference between the old and new results is very small, continue to iterate the algorithm is not meaningful) are very difficult to do. That is still very limited.

bartosz25 commented 2 years ago

Hi,

Sorry because I don't know how to implement the pipeline in a Spark way :( I'm aware that uploading & downloading a file could be slower than reading data from memory but still. When you do the collect, you also pay the network traffic because the driver asks the executors for their tasks results.

Now I can see a potential problem with the full outer join implementation, though. Spark's collect will have the results indexed by the task number (I blogged about this here: https://www.waitingforcode.com/apache-spark/collect-action-determinism/read). On the other hand, I don't know whether we can apply the same deterministic logic to the id generation (the 1st step in my solution)

I am confused is that because Apache Beam says it is a unified pipeline SDK for unified streaming data and batch processing, can replace Spark, the basic two PCollection equal are very difficult to judge, that for some algorithms of machine learning, the need to determine whether the results converge (the difference between the old and new results is very small, continue to iterate the algorithm is not meaningful) are very difficult to do. That is still very limited.

For me Apache Beam is an API that tries to cover 3 different APIs:

And it's quite challenging because even though they all process the data, they do it differently and have different features (e.g. savepoint and checkpoint in Flink vs checkpoint in Spark; micro-batch in Spark vs element-based processing in Flink, ...). Unfortunately, you seem to face one of the differences between these frameworks.

Personally, I've never used Beam as a "unified framework". Instead, I've always been writing Spark pipelines with Spark API and Dataflow pipelines with the Beam API since there is no Dataflow API available on GCP. Put another way, when I have a Dataflow job, I use Beam and I know Spark API good enough to write the code with it. I'm less fluent with Flink, though, but I think that I would stay with Flink API anyway.

Best, Bartosz.

PS. There is a compatibility matrix (https://beam.apache.org/documentation/runners/capability-matrix/) if you're interested by the differences.

keeper-jie commented 2 years ago

I think you are right, because Spark provides MLib while K-means exists as a base algorithm example, I thought it would be easy to implement with Apache Beam. But in my practice, simply determining whether the algorithm converges or not is now difficult. In StatckOverFlow, I asked questions and got answers from Google engineers, because Pipeline may run in any distributed backend, so it is not possible to get the data of PCollection at runtime, I tried by code, two PCollection created from memory with the same content are not equal, and the same The PCollectionView created in memory is also not equal. Then I think, Apache Beam has a long way to go to replace Apache Spark.

But for algorithms that specify the number of iterations, the Apache Beam implementation is still more effective. When I was using Spark, I found that the API functions are very rich, while the Apache Beam concept is more concise, because it only provides some basic APIs considering generality.

Thanks again for answering my questions.