alibaba / jstorm

Enterprise Stream Process Engine
http://jstorm.io
Apache License 2.0
3.91k stars 1.81k forks source link

Trident BatchSpout无法正常工作 #239

Open improvejin opened 8 years ago

improvejin commented 8 years ago

我们有个BatchSpout继承自storm.trident.spout.IBatchSpout: public void emitBatch(long batchId, TridentCollector collector) { final boolean outputFirstPeriod = this._outputFirstPeriod; final boolean checkSelfKilled = this._checkSelfKilled && this._checkKilledIntervalSeconds < super.getPeriodSeconds(); final boolean isFirstPeriod = this._firstPeriod;

    long emitCount = 0;

    log.info(String.format("[%s] emitBatch Begin, batchId %d ... ...", logId(), batchId));
    if (markBatchPerExecutor(batchId, this._topologyContext) == false){
        log.info(String.format("[%s] emitBatch ignore remaining tasks after first in same executor, batchId %d.", logId(), batchId));
        return;
    }
    if (isFirstPeriod && outputFirstPeriod == false){
        log.info(String.format("[%s] emitBatch ignore first period data, batchId %d.", logId(), batchId));
    }

    this._lastCheckKilledTime = System.currentTimeMillis();

    Calendar periodEnd = this.period.currentPeriodEnd();

    while (Period.nowStillBeforePeriodEnd(periodEnd)) {
        if (checkSelfKilled && needCheckSelfKilled() && isTopologyKilled()) {
            close();
            log.info(String.format("[%s] emitBatch exit, batchId %d, detect topology killed.", logId(), batchId));
            return;
        }

        MessageAndMetadata<byte[], byte[]> message = null;
        try {
             message = _iterator.next();               
        } catch(final ConsumerTimeoutException e) {

        }

        if (message == null) {
            if (checkSelfKilled && isTopologyKilled()) {
                log.info(String.format("[%s] emitBatch exit, batchId %d, detect topology killed after data timeout.", logId(), batchId));
                return;
            }
            continue;
        }
        if (isFirstPeriod && outputFirstPeriod == false){
            continue;
        }                      

        final List<Values> tuples = getTuplesToEmit(message);
        for(Values tuple : tuples) {
            collector.emit(tuple);
            emitCount += 1;
            String time = new String(message.key());
            log.trace(StringUtils.join(tuple, this._fieldSeparator) + " " + time + "  batchId" + batchId);
            if (this._printEmitData){

              log.info(String.format("[%s] consume message time: %s, batchId%d", logId(), time, batchId));
             System.out.print(String.format("[%s] emit data: ", logId()));                    

            }
        }                       
    }

    if (isFirstPeriod) {
        this._firstPeriod = false;
    } 
    log.info(String.format("[%s] emitBatch end, batchId %d, %d data emitted", logId(), batchId, emitCount));
}

emitBatch中是一个死循环,代表一个时间窗口对应一个batch,而且是忽略第一个batch,这段代码在storm上可以正常工作,在jstorm下却会不行,而且打印出来的的batchId会出现间隔,而且日志中会有StormBoundedExponentialBackoffRetry:44 pool-17-thread-1] The baseSleepTimeMs [1000] the maxSleepTimeMs [30000] the maxRetries [20] .ThriftClient:108 pool-17-thread-1] masterHost:10.208.186.190:7627 StormBoundedExponentialBackoffRetry:44 pool-17-thread-1] The baseSleepTimeMs [2000] the maxSleepTimeMs [60000] the maxRetries [5] 不太明白什么意思,可否 帮忙看看?

hustfxj commented 8 years ago

jstorm trident 有问题,我们已经修复了,近期会推上去

hustfxj commented 8 years ago

不过 你可以先试一下将 spout.single.thread 设置为 true,跑一跑

lizhiqiang123321 commented 7 years ago

ICommitter执行几次就不跑了,使用官网的例子SimpleBatchTopology