proteus-h2020 / proteus-solma

18 stars 10 forks source link

SimpleReservoirSampling #12

Open WenjuanW opened 7 years ago

WenjuanW commented 7 years ago

Hi, when I run the code, in ReservoirSamplingITSuite, I set

    env.setParallelism(3)
    env.setMaxParallelism(3)

and

    val transformer = SimpleReservoirSampling()
      .setReservoirSize(4)

I got the result:


2> DenseVector(1100.0, 3.0, 0.0)
1> DenseVector(3000.0, 4.0, 0.0)
1> DenseVector(1427.0, 3.0, 0.0)
1> DenseVector(1940.0, 4.0, 0.0)
1> DenseVector(4478.0, 5.0, 0.0)
1> DenseVector(3031.0, 4.0, 0.0)
3> DenseVector(1600.0, 3.0, 0.0)
3> DenseVector(1320.0, 2.0, 0.0)
2> DenseVector(2200.0, 3.0, 0.0)
1> DenseVector(1985.0, 4.0, 0.0)
3> DenseVector(1604.0, 3.0, 0.0)
3> DenseVector(1000.0, 1.0, 0.0)
3> DenseVector(1200.0, 3.0, 0.0)
2> DenseVector(1811.0, 4.0, 0.0)
3> DenseVector(2400.0, 3.0, 0.0)
1> DenseVector(1534.0, 3.0, 0.0)
3> DenseVector(2300.0, 4.0, 0.0)
3> DenseVector(1839.0, 2.0, 0.0)
1> DenseVector(1890.0, 3.0, 0.0)
2> DenseVector(2132.0, 4.0, 0.0)
2> DenseVector(1664.0, 2.0, 0.0)
2> DenseVector(1203.0, 3.0, 0.0)
2> DenseVector(3890.0, 3.0, 0.0)
2> DenseVector(3137.0, 3.0, 0.0)

I guess the number before each DenseVector is the label of distributed stream (1, 2, 3, 4)? As we set the setReservoirSize as 4, we should get a fixed reservoir which has size 4 for each distributed stream (or for the whole reservoir after the merging, which depends on the code).

I suspect the reason of getting this result is because of Line 93 and Line 98 data += element. Line 97 is correct and we should return reservoir as the sampling which has size as defined (4 in this case). I don't understand what is data for.

Please correct me if I am wrong. I tried to change the code but did not succeed. I am still struggling with Flink and some scala syntax. Thanks.

VenturaDelMonte commented 7 years ago

Hi, exactly, each number of the left is the id of the parallel stream producing that vector. I see your concern but actually we implemented like this after a previous meeting we had with your team. Basically, we assume that the we get a sample at a time on each parallel instance and we update the internal partial reservoir sampling (using the simple algorithm). Then, we output only the changes in the reservoir sampling (i.e., the item(s) that the algorithm eventually inserts). In this way, if the reservoir does not change, network bandwidth is saved, whereas, if it does change, only the new item is sent downstream (saving bandwidth). Furthermore, we assume that sampling is always performed before something else, thus, we do not merge the parallel sampled items to exploit maximum parallelism. I hope this helps in understanding our choices. If you have further questions, please, feel free to comment and ask!

EDIT: data is just a data structure that contains the eventual change in the reservoir. I used it because the whole operation is implemented in a flatMap (actually, a flatMapWithState).

WenjuanW commented 7 years ago

@VenturaDelMonte Thanks for getting back to me. The principle of reservoir sampling is that the probability of each item in the stream being selected is the same. So, that the samples in the reservoir can represent the whole stream. During simple reservoir sampling, the size of the reservoir is fixed. We should output the reservoir instead of outputting the changes. Then, these samples can be used for something else.

Therefore, in this example, we should have 3 reservoir for 3 parallel streams, and each of the reservoir has size 4.

As I see it, data keeps the changes in the reservoir during sampling which might explode if the data stream is unbounded. It might cause a problem. Am I right?

Thank you for the links and explanation. Can we do flapMapWithState without data? I am trying to not use the data and output the reservoir. But I haven't make it work yet.

VenturaDelMonte commented 7 years ago

I see your point and it is an easy modification to make it output the samples. However, the way we implemented it has a rationale behind it:

That being said, I slightly changed the implementation to introduce your suggestion [1]. Please, feel free to try it. Furthermore, could you please point out whether the first implementation breaks any probabilistic assumption?

[1] https://github.com/VenturaDelMonte/SOLMA/commit/b5d9263c4d22a0510b6113b748c21d9f25a568c8

WenjuanW commented 7 years ago

Thank you very much, I saw the changes and tried it. Now the output is like this,

1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0), DenseVector(1494.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0), DenseVector(1494.0, 3.0, 0.0), DenseVector(1239.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0), DenseVector(1494.0, 3.0, 0.0), DenseVector(1239.0, 3.0, 0.0), DenseVector(1380.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0), DenseVector(2000.0, 3.0, 0.0), DenseVector(1239.0, 3.0, 0.0), DenseVector(1380.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0), DenseVector(2000.0, 3.0, 0.0), DenseVector(4215.0, 4.0, 0.0), DenseVector(1380.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1534.0, 3.0, 0.0), DenseVector(2000.0, 3.0, 0.0), DenseVector(4215.0, 4.0, 0.0), DenseVector(1427.0, 3.0, 0.0))
2> ArrayBuffer(DenseVector(1940.0, 4.0, 0.0), DenseVector(2000.0, 3.0, 0.0), DenseVector(4215.0, 4.0, 0.0), DenseVector(1427.0, 3.0, 0.0))
1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0), DenseVector(2609.0, 4.0, 0.0))
1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0), DenseVector(2609.0, 4.0, 0.0), DenseVector(1839.0, 2.0, 0.0))
1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0), DenseVector(2609.0, 4.0, 0.0), DenseVector(1839.0, 2.0, 0.0), DenseVector(2162.0, 4.0, 0.0))
1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0), DenseVector(1852.0, 4.0, 0.0), DenseVector(1839.0, 2.0, 0.0), DenseVector(2162.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0))
1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0), DenseVector(1852.0, 4.0, 0.0), DenseVector(1985.0, 4.0, 0.0), DenseVector(2162.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0), DenseVector(1268.0, 3.0, 0.0))
1> ArrayBuffer(DenseVector(1416.0, 2.0, 0.0), DenseVector(1852.0, 4.0, 0.0), DenseVector(1985.0, 4.0, 0.0), DenseVector(1767.0, 3.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0), DenseVector(1268.0, 3.0, 0.0), DenseVector(1236.0, 3.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0), DenseVector(1268.0, 3.0, 0.0), DenseVector(1236.0, 3.0, 0.0), DenseVector(1962.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0), DenseVector(1458.0, 3.0, 0.0), DenseVector(1236.0, 3.0, 0.0), DenseVector(1962.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0), DenseVector(1458.0, 3.0, 0.0), DenseVector(2637.0, 3.0, 0.0), DenseVector(1962.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(2400.0, 3.0, 0.0), DenseVector(1458.0, 3.0, 0.0), DenseVector(2637.0, 3.0, 0.0), DenseVector(1437.0, 3.0, 0.0))
1> ArrayBuffer(DenseVector(2040.0, 4.0, 0.0), DenseVector(1852.0, 4.0, 0.0), DenseVector(1985.0, 4.0, 0.0), DenseVector(1767.0, 3.0, 0.0))
3> ArrayBuffer(DenseVector(852.0, 2.0, 0.0), DenseVector(1458.0, 3.0, 0.0), DenseVector(2637.0, 3.0, 0.0), DenseVector(1437.0, 3.0, 0.0))
3> ArrayBuffer(DenseVector(2300.0, 4.0, 0.0), DenseVector(1458.0, 3.0, 0.0), DenseVector(2637.0, 3.0, 0.0), DenseVector(1437.0, 3.0, 0.0))
1> ArrayBuffer(DenseVector(3031.0, 4.0, 0.0), DenseVector(1852.0, 4.0, 0.0), DenseVector(1985.0, 4.0, 0.0), DenseVector(1767.0, 3.0, 0.0))
3> ArrayBuffer(DenseVector(2300.0, 4.0, 0.0), DenseVector(1888.0, 2.0, 0.0), DenseVector(2637.0, 3.0, 0.0), DenseVector(1437.0, 3.0, 0.0))
1> ArrayBuffer(DenseVector(3031.0, 4.0, 0.0), DenseVector(1000.0, 1.0, 0.0), DenseVector(1985.0, 4.0, 0.0), DenseVector(1767.0, 3.0, 0.0))
3> ArrayBuffer(DenseVector(2300.0, 4.0, 0.0), DenseVector(1888.0, 2.0, 0.0), DenseVector(2637.0, 3.0, 0.0), DenseVector(2567.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(2300.0, 4.0, 0.0), DenseVector(1888.0, 2.0, 0.0), DenseVector(1320.0, 2.0, 0.0), DenseVector(2567.0, 4.0, 0.0))
3> ArrayBuffer(DenseVector(1200.0, 3.0, 0.0), DenseVector(1888.0, 2.0, 0.0), DenseVector(1320.0, 2.0, 0.0), DenseVector(2567.0, 4.0, 0.0))

I am not sure how to do this but is it possible to only output the latest reservoir? It is not important to know how the reservoir elements changes during the sampling. We only want the latest updated reservoir which is a sampling for the seen part of the stream.

Sorry I wasn't clear. What I said the samples in the reservoir can represent the whole stream means that the latest reservoir represents the seen part of the stream. Each item from the seen part has the same probability to be chosen into the reservoir. By the first implementation, you mean collecting all the changes of the reservoir?

VenturaDelMonte commented 7 years ago

Since the implementation I sent you also assumes that the stream is unbounded, it always outputs the latest reservoir of the seen part of the stream. This means that for each incoming item, the algorithm may output the latest updated reservoir.

Regarding the first implementation, the next algorithm in the pipeline would need to take care of either reconstructing the reservoir or processing only the new item. Perhaps, we may find a middle ground by adding a config parameter that lets the SOLMA user decide whether she wants to output the whole reservoir or only the changed item. Would that work according to you?

WenjuanW commented 7 years ago

I think the baseline is that the latest updated reservoir is ready to be used by the SOLMA user. Maybe the definition of your output here is different from mine. I think the output of an algorithm is the result of your algorithm. The purpose of SimpleReservoirSampling is to have a sampling containing the samples and we can use it for something else. Your output, I guess, is something shown on windows that the user can see?