Open tuan08 opened 9 years ago
Here is the result I got when running the KafkaToKafkaDataflowUnitTest
Message Tracker:
0
1 10000 true 2414[1,2,3,4,5...2561,2569,2572,2573,2582]
1
1 10000 true 2626[1,2,3,4,5...2712,2722,2743,2744,2757]
2
1 10000 true 2213[1,2,3,4,5...2360,2362,2380,2388,2405]
3
1 10000 true 0
4
1 10000 true 0
5
1 10000 true 0
6
1 10000 true 0
7
1 10000 true 0
8
1 10000 true 0
9
1 10000 true 0
Log Count: 107253
Source input
Number Of Streams 10
Write Count 100000
Duration 3153ms
Sink input
Number Of Streams 10
Read 107253
Duration 924ms
There are about 7253 duplicated messages, try to add more debug code to registry , dataflow task print out... Try to figure out if it is correct or not. For now when the task process 100 records , it will call commit. Think to what if it processes 50 records and get interrupted, is both source and sink commit and rollback properly...
Hint: You need to find a way to identify stream id, find a way to print out the offset to see if it work correctly.
The goal is to learn the code and improve the debug and trace code skill
String command =
"dataflow-test " + KafkaDataflowTest.TEST_NAME +
" --dataflow-name kafka-to-kafka" +
" --worker 3" +
" --executor-per-worker 1" +
" --duration 90000" +
" --task-max-execute-time 1000" +
" --source-name input" +
" --source-num-of-stream 10" +
" --source-write-period 0" +
" --source-max-records-per-stream 10000" +
" --sink-name input " +
" --print-dataflow-info -1" +
" --debug-dataflow-task-detail " +
" --debug-dataflow-vm-detail " +
" --debug-dataflow-activity-detail " +
" --junit-report build/junit-report.xml" +
" --dump-registry";
the sink-name and source-name is the same here
If I just change the sink name to "sink" or anything else
Message Tracker:
Message Tracker
-------------------------------------------------------
Partition From To In Sequence Duplication
-------------------------------------------------------
0
1 10000 true 0
1
1 10000 true 0
2
1 10000 true 0
3
1 10000 true 0
4
1 10000 true 0
5
1 10000 true 0
6
1 10000 true 0
7
1 10000 true 0
8
1 10000 true 0
9
1 10000 true 0
Log Count: 100000
Dataflow Test Report
---------------------------------
Name Value
---------------------------------
Source input
Number Of Streams 10
Write Count 100000
Duration 3078ms
Sink sink
Number Of Streams 5
Read 100000
Duration 7535ms
Look like that there are many messages are duplicated when the task switch to suspended. Debug and review when task switch + transaction to make sure the dataflow behave correctly