calrissian / flowmix

Flowmix is a flexible event processing engine for Apache Storm. It supports complex correlations of events via sliding/tumbling windows. It allows parallel streams to be processed that can be grouped together in different ways.
Apache License 2.0
55 stars 20 forks source link

JoinBolt didn't deal with buffer==null #51

Open zqhxuyuan opened 9 years ago

zqhxuyuan commented 9 years ago

I see the logic of cacheWindow(buffersForRule) and window(buffer) is different between AggregatorBolt, SortBolt and JoinBolt. In JoinBolt ,there are non processing in situation① below (Line 176):

  if (buffersForRule != null) {
        buffer = buffersForRule.getIfPresent(flowInfo.getPartition());
        if (buffer != null) {    // if we have a buffer already, process it
              if(op.getEvictionPolicy() == Policy.TIME)
                    buffer.timeEvict(op.getEvictionThreshold());
        }
       //Here no else ....  ①
 } else {
       //new buffersForRule and buildWindow, then put to buffer and buffersForRule...
 }

but in Agg and Sort there both has this logic:

                if (cacheWindow != null) { 
                    window = cacheWindow.getIfPresent(flowInfo.getPartition());
                    if (window != null) {    // if we have a buffer already, process it
                        if (op.getEvictionPolicy() == Policy.TIME)
                            window.timeEvict(op.getEvictionThreshold());
                    } else {
                        //....
                    }
                } else {
                    //....
                }

Can u explain why here does't need?
As I understand, different partition should given different window/buffer.