Open wizali opened 9 years ago
hi
我的问题是,我注册了一个consumer,监听某个topic。
当我收到一条消息以后,我想让consumer暂停接收下一条消息,等这一条处理完以后再继续接收。也就是实现一条一条顺序处理。
我目前的处理方法是:当接收到一条消息以后,使用
consumer.pause();
然后等这条消息处理完以后,调用
consumer.resume();
但是这样好像不行。
请能告诉我怎么解决吗?
我连接到Storm就是一个接 一个了。
@wizali 目前不支持你说的这种方式,你所采用的 consumer.pause() 也并不能可靠的解决这个办法。我能想到两种方式你可以参考下:
lib/Client
进行封装,实现一个主动拉的 consumer,Kafka protocol 本质上也是 request/response 模式,但是每次 response 可能返回多条 message,所以你需要取收到的第一条 message 然后丢掉其他的,下次一再请求下一条 message@haio 我也遇到了这个问题,有个疑问,你说的“处理完一条后进行 commit”的这个 'commit' 是 HighLevelConsumer 的方法 commit 吗?我看代码里这个方法并不能传 offset 啊,我们应该处理完一条数据后传这个数据的 offset 和 partition 的对吧?
@haio 如果我维护了一份内存队列,但是我在处理过程中服务器崩溃了,这时我又还没commit已经处理过得任务,这种情况怎么处理好?