ali-sdk / ali-ons

Aliyun Open Notification Service Client
MIT License
151 stars 43 forks source link

消费失败后,如何重新拉取数据? #66

Closed geemo closed 5 years ago

geemo commented 5 years ago

消费失败后,如何从指定点位重新拉取数据,没有 ack 机制吗,只能重启服务,重新 consumer.subscribe 传入 consumeFromWhere 来拉取吗?

那假如一次性拉取 32 条,并行处理,发现第一条消费失败,那要再一次重新拉取 32 条,会有重复数据,设置成一次拉取 1 条的话,效率又不行。。。

denghongcai commented 5 years ago

可以先阅读 RocketMQ 的实现机制,首先消息投递的原则是至少投递一次,

消费位点是用 offset 来维护的,要手工指定位点,就需要 consumeFromWhere

如果是消费失败的场景,一条消息消费失败(subscribe 抛错),会自动发送重试消息

geemo commented 5 years ago

因为我在 subscribe handler 中使用了 worker_thread post 了数据到工作线程中,因为是新模块,这一步 postMessage 在 worker 退出的时候 post 依然不会报错,所以 handler 中 catch 不到异常

于是我想到了一个策略,但是需要每次获取到符合 subExpression 批量拉取的实际数条数做 lockNum,并设定超时时间,每次处理完成一条后会 update timeout,超时后会跑出异常。

所以如何在 handler 中获取到实际拉取的条数呢?? @denghongcai

如何获取 consumeTasks.length

denghongcai commented 5 years ago

你用这种手段的的话建议是确认 worker 开始处理后在 handler 里等待 worker 执行结束,例如

await worker.done();

根据结果来抛错

geemo commented 5 years ago

直接在 handler 中进行 await worker.done(); 不太好。

因为我的 worker 是用来做流式数据处理的(分为多个通道,每个通道由多个插件 transform 流组成来进行流式计算)。

那么我在拉取数据到这批数据流式计算处理完毕,需要保证有 >= 1 个 常驻 handler 来触发失败重试,多余的 handler 我想在主线程 post 完数据到子线程后就可以立马出栈释放。也就引出了上面使用 lockNum + timeout 的方式。

以上问题在于,ali-ons sdk 在 _consumeMessageLoop 中默认帮我们做了并发任务消费,想问下后续版本中能否在 subscribe handler 中直接将过滤好的 msgs 返回就行了,并发处理那部分可以用户自己来做? @denghongcai @gxcsoccer

denghongcai commented 5 years ago

也不是不可以,只是就算你自己做并发任务消费,你应该还是要等并发处理完后才能拉下一批消息,自己控制的思路不是太明白

gxcsoccer commented 5 years ago

现在蚂蚁这边的实现改成:

拉一批,并行消费

gxcsoccer commented 5 years ago

因为 sendBack 失败的概率比较低,所以基本上可以保证消费速度

geemo commented 5 years ago

基本解决了,大致实现思路如下:

// 主线程分发数据
consumer.subscribe(rocketmqConf.topic, '*', async msg => {
  const msgId = msg.msgId;
  const buf = Buffer.concat([
    msg.body,
    Buffer.from(`||${msg.properties.TAGS}||${msgId}`)
  ]);
  selectWorker(workers).postMessage(buf);
  // 锁住
  await wait.lock(msgId);
});
// 主线程接收到工作线程 post 的数据后
worker.on('message', msg => {
  const msgId = msg.msgId;
  // 调用后 subscribe handler 会退出
  wait.free(msgId);
});
// wait 实现
'use strict';

// resolve and reject map
let rjMap = {};
// 30s timeout
let expirTime = Date.now() + 30 * 1000;

function lock(msgId) {
  return new Promise((resolve, reject) => {
    rjMap[msgId] = { resolve, reject };
  });
}

function free(msgId) {
  rjMap[msgId].resolve();
  expirTime = Date.now() + 30 * 1000;
  delete rjMap[msgId];
}

setInterval(() => {
  let now = Date.now();
  if (now > expirTime) {
    for(let key in rjMap) {
      rjMap[key].reject(new Error('subscribe handle timeout'));
      break;
    }

    rjMap = {};
  }
}, 500);

module.exports = {
  lock,
  free
};