alibaba / jstorm

Enterprise Stream Process Engine
http://jstorm.io
Apache License 2.0
3.91k stars 1.81k forks source link

jstorm-kafka内存泄漏? #602

Open whaon opened 6 years ago

whaon commented 6 years ago
   public void ack(long offset) {
        try {
            pendingOffsets.remove(offset);
        } catch (Exception e) {
            LOG.error("offset ack error " + offset);
        }
    }

偏移量只在ack中有移除操作,如果不使用ack,pendingOffsets就会一直增长,会导致内存泄漏 为什么不在commitState()方法中,使用pollFirst()方法,而使用first()方法

public void commitState() {
        try {
            long lastOffset = 0;
            if (pendingOffsets.isEmpty() || pendingOffsets.size() <= 0) {
                lastOffset = emittingOffset;
            } else {
                lastOffset = pendingOffsets.first();
            }
            if (lastOffset != lastCommittedOffset) {
                Map<Object, Object> data = new HashMap<Object, Object>();
                data.put("topology", stormConf.get(Config.TOPOLOGY_NAME));
                data.put("offset", lastOffset);
                data.put("partition", partition);
                data.put("broker", ImmutableMap.of("host", consumer.getLeaderBroker().host(), "port", consumer.getLeaderBroker().port()));
                data.put("topic", config.topic);
                zkState.writeJSON(zkPath(), data);
                lastCommittedOffset = lastOffset;
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }

    }

@unsleepy22

elloray commented 6 years ago

1.因为这里是保证你的消息被处理才去移出对应的offset 2.至于pendingOffsets,首先它的定义是个接口sortedset而不是treeset,所有没有pollFirst方法;第二,不使用pollfirst,还是要保证在ack之后再去移出相应的offset @whaon

whaon commented 6 years ago

那就是要是有kafka插件,就必须要使用ack了