Xtra-Computing / briskstream

A Multicore, NUMA Optimised Data Stream Processing System
Apache License 2.0
39 stars 12 forks source link

yahoo streaming benchmark on briskstream ? #5

Open chenzongxiong opened 5 years ago

chenzongxiong commented 5 years ago

Is there any implementation for YSB on briskstream. Or is there any hint for for me to implement some new benchmark?

ShuhaoZhangTony commented 5 years ago

Hi,

Currently, no. You are very welcome to implement it on briskstream. You may want to checkout "BriskRunner.java" and implement one simple application by mimic the "WordCount" application. Please remember to checkout the ``stable" branch.

The API is almost identical to Storm (except some syntactic sugars), so it should be straightforward if the application is implemented in Storm.

Tony.

chenzongxiong commented 5 years ago

Dear Shuhao, Thanks for your quickly reply.

When I start to run the WordCount app with profiling referring to issue #4

java -cp target/BriskBenchmarks-1.2.0-jar-with-dependencies.jar applications.BriskRunner --profile

I encountered the following problem. It indicates that I didn't prepare dataset for WordCount. Can you share me the dataset you used.

image

I need to understand how it works and then implement YSB. Thanks in advance.

ShuhaoZhangTony commented 5 years ago

Hi,

For wordcount, I used ./briskstream/common/src/main/java/applications/tools/zipf.java to generate the datasets.

The input source is Linux dictionary, normally at /usr/share/dict/words.

Besides, if you just need to run the program, you can pass --native without profiling or optimization involved.

Tony

chenzongxiong commented 5 years ago

Hi, Since I want to compare the throughput between my system and yours, I want to run your system under optimal situation. And that's why I try to profile first.

ShuhaoZhangTony commented 5 years ago

Hi,

Sure, please also remember to configure your machine specification accordingly as done in ./briskstream/common/src/main/java/applications/Platform.java ./briskstream/common/src/main/java/applications/HUAWEI_Machine.java

Then, specify which machine you are using by passing "--machine" argument.

Thanks!

Tony

chenzongxiong commented 5 years ago

Hi Tony, I tested your system with single thread for each bolt and the throughput is shown in the picture. image

Here is my configuration:: image

For details, you can also view my source code

Does the throughput make sense ? Thanks in advance.

ShuhaoZhangTony commented 5 years ago

Hi Zongxiong,

The report of throughput of each operator (executor specifically) is measured by the number of function invocation divided by the total duration. This is more for debug purpose.

The reported throughput of different operators may vary significantly because of the queue. So, in your case, Spout is running at much higher speed and its output is accumulated at its output queue. The ultimate application performance should be the final operator (i.e., Sink)'s throughput. But, again, please refer to the throughput reported by helper.java line 109 instead of the boltThread.java (your red highlighted part).

Regards, Tony

grtheod commented 5 years ago

Hey Tony,

this is a follow-up to the questions asked by @chenzongxiong (I've used some parts of his logic to implement my solution). I am trying to implement the YahooBenchmark too with BriskStream and make a fair comparison with my system. These are the parameters I pass to the BriskStreamRunner (I've configured my machine details -- 2 sockets with 8 cores per socket and 64gb RAM):

VM options: -server -XX:+UseConcMarkSweepGC -XX:NewRatio=2 -XX:SurvivorRatio=16 -Xms29g -Xmx29g
Program arguments: --app YSB --gc_factor 1 --backPressure --compressRatio -1 -st 1 -sit 1 --relax 1 --num_socket 2 --num_cpu 8 --machine 3 

In queries that have some aggregation logic, the throughput can only be measured from the source, because the throughput of sinks is significantly smaller based on the aggregation.

By reading previous issues and playing with the code, I've seen there is no window semantics, thus I am emulating count-based windows (the logic is still incorrect, but let's assume it does the job). For simplicity, I have a spout that replays data, followed by a bolt that does all the processing and finally a sink that just receives data. I didn't manage to use the auto-scaling with the --tune flag, as it crashes most of the time. My main problem though is the performance. With the following logic, BriskStream manages to reach 4 million tuples/sec (accumulating the numbers from sources). On the same machine, I've managed to make Flink reach 22 millions/sec.

This is how the core logic looks like. I would be grateful if you could suggest what I am doing wrong here.

// Spout pseudocode
// load data as bytes in rawTuples
void loadDataFromFile() {...}
// create and forward new tuples by replaying from rawTuples
void nextTuple() {
    YSBTuple tuple = new YSBTuple(this.rawTuples[this.currentIndex++]); // create a new tuple
    collector.emit(tuple);
    if (this.currentIndex >= this.tupleToRead) {
        this.currentIndex = 0;
    }
}
// YahooBolt pseudocode
counter = 0;
windowSize = 10000; // emulate count-based windows
Map<String, long> counts;
void execute(TransferTuple in) {
    int bound = in.length;
    for (int i = 0; i < bound; i ++) {
        YSBTuple tuple = (YSBTuple) in.getValue(0, i);
        // filter based on some logic
        if (tuple.eventType.substring(0, 4).equals("view")) {            
            counts.updateCounter(tuple.campaignId); // count campaigns
            counter++;        
            if (counter == windowSize) { // emit results and reset counters
                for (Map.Entry<String, Long> entry : counts.entrySet()) {
                    collector.emit(4, new OutputTuple(entry.getKey(), entry.getValue()));
                }
                counter = 0;
            }
        }
    }
}

Cheers, George

ShuhaoZhangTony commented 5 years ago

Hi George,

I have three comments.

First, I still suggest to measure performance by Sink" rather thanSpout". The reasons are two folds. 1) what user ultimately want is the final results (e.g., end-to-end latency/throughput) 2) the focus of BriskStream is to maximize``Sink"'s output. That is to say, BriskStream may reduce the speed/parallelism of Spout (as long as it's sufficiently large) in order to save resource to maximize speed of other operators. This has some consequences (see Third).

Second, when --tune" is disabled, you have to configure the parallelism of each operator manually. Besides, have you tuned--bt"? (i.e., transferTuple batch size). I'm not sure if you have configured them correctly.

Third, in its current stage, BriskStream assumes each operator has a constant workload for each input tuple. This does not hold universally. For example, "for (Map.Entry<String, Long> entry : counts.entrySet()) " seems to violate the assumption, you need to reformat the code to avoid such.

That's all I have in mind now. Thanks!

Tony.

grtheod commented 5 years ago

Hi Tony,

Thanks for the prompt response and help.

You are right about the tumbling window representation. I will add the code/logic from Storm for handling them properly, but this was just the first step. The --tune was disabled because the system couldn't find a plan that utilized all the cores without crashing (I have to figure it out).

However, the batching indeed increased the performance, along with the removal of some redundant mem copies!

Regarding the throughput, in my mind, it is the number of ingested and processed records per time unit (from source).

Cheers, George