abbshr / abbshr.github.io

人们往往接受流行,不是因为想要与众不同,而是因为害怕与众不同
http://digitalpie.cf
444 stars 44 forks source link

Debug: 关于回调函数是否应该为纯异步执行的思考 #61

Open abbshr opened 7 years ago

abbshr commented 7 years ago

TL;DR. 在测试 Kafka 消费速率时无意间碰到一个错误, 引出了一个对回调函数是否应该为异步的话题.

等等, 让我把这句话的前因后果说清楚. 事情是这样的:

我在用 Kafka 构建团队的 data-pipeline, 经过一个月的时间终于要接近尾声了.

为了方便监控 partition log 累积情况以及消费情况, 我们使用了一个开源的管理面板. 它可以正常显示 kafka console consumer 及其消费的情况, 然而我们自己编写的消费者却捕获不到.

今晚为了查明原因, 我用 Kafka client SDK 自己编写了一个简单的消费者, 先去消费一个 Topic.

此处声明, 我们的消费者是在 Node.js 平台上运行的.

我在一边控制生产 10w 条消息写入一个 partition, 消费者紧接着开始消费, 我在终端输出了消费计数, 当消费到 1000 个左右时出现了意外:

error

一看错误就知道哪里爆栈了. 但是, 这怎么可能呢?

这里不得不提一下我们的消费者构成:

我们使用 Kafka 的最根本原因不是看中了性能, 而是为了构建数据管道所需的消息的持久化和严格的顺序保障, 其中 数据集成 就依赖一个状态变更序列. 所以在 kafka-consume-framework 中, 为了保证每条消息按顺序准确的成功消费(比如完成在其他系统的读写)一次, 我对每个 topic(这里即单个 partition) 中的一条消息在消费成功后都提交一次 offset, 当提交成功后再进行下一次的消费, 这是一个完全串行的任务序列:

------------------------------------------------------------------> kafka brokers
 ↓ (1. 拉一条消息)                             ↓ (4. ACK & 从上次 offset 处拉新的消息)

                (2. 消费) ↑ (3. 提交当前 offset)
------------------------------------------------------------------> consumer

kafka-consume-framework 封装了 kafka-node, 主要是因为后者提供的 API 存在错误: 如果没有注册 "message" 事件, 那么从 broker 拉去的消息都将流失而不会缓存下来, 也就是没有流的回压 & 流速控制特性.

为什么不注册事件监听器? 因为我们需要一个串行执行场景: 收到消息 -> 成功消费 -> 确认反馈 -> 继续消费. 对于一个数据源, 在 Node.js 里我们需要这样写:

consume = (msg) ->
  # process ...
  if (no_error)
    # ack
    consumer.commit (err, data) ->
      if (err)
        # other reconsume strategy
      else
        consumer.once "message", consume
  else
    # other reconsume strategy

consumer.once "message", consume

然而 kafka-node 在创建消费者后会不断 emit message 事件, 也就是说如果两次 once 注册之间存在异步操作, 那么很有可能导致获取到的数据不是连续的(异步操作耗时长一点的话甚至取不到后面的数据了).

所以 kafka-consume-framework 提供了一个 Pipe 模块, 给 kafka-node 装备成一个 stream. (具体实现上没有用 stream 模块原因是不合适):

{ EventEmitter } = require 'events'

class Pipe extends EventEmitter

  constructor: ({ @src, max_size: @MAX_SIZE = 1024 }) ->
    super()
    @_queue = []
    @_head = 0
    @_init()

  _init: ->
    @src.on "message", (message) => @emit "message", message
    @src.on "error", (err) => @emit "error", err

  reset: ->
    @_queue = []
    @_head = 0
    @src.resume()

  emit: (event, data) ->
    return super event, data if event isnt "message"
    if @listenerCount event
      super event, data
    else
      @_queue.push data
      @src.pause() if @_queue.length > @MAX_SIZE

  on: (event, callback) ->
    super event, callback
    callback data for data in @_queue
    @reset()

  once: (event, callback) ->
    if @_queue.length is 0
      super event, callback
      @reset()
    else
      callback @_queue[@_head]
      @reset() if ++@_head is @_queue.length

Pipe 类继承了 EventEmitter. 对于重载的 once/on 方法, 当内部缓冲队列里有数据时, 直接调用回调.

看到这里, 你应该察觉到重写的 once 不异步(监听器的执行没有放到 next tick 里), 这就有潜在的爆栈隐患, 可能导致了前面说的错误.

所以我把两个函数直接调用 callback 的情况改为用 setImmediate 放到 next tick 里, 再次运行, 果然没问题.

等等, 别高兴的太早. 问题真的是它么? 我们再来回顾下这段代码, 现在应该改成用 pipe 了:

consume = (msg) ->
  # process ...
  if (no_error)
    # ack
    consumer.commit commited
  else
    # other reconsume strategy

commited = (err, data) ->
  if (err)
    # other reconsume strategy
  else
    pipe.once "message", consume

pipe.once "message", consume

按这个流程来, 如果每个消息都是顺序执行的, 首先是根本不存在并发, 其次是就算 once 里直接执行了 callback, 但是因为 consumer.commit (kafka-node 的 API) 这个异步函数的存在, consume 函数会尽早退出, 而 pipe.once "message", consume 会被放到 next tick 里执行的, 所以说根本不存在爆栈的问题啊!

在仔细审阅了 kafka-consume-framework 代码后确定逻辑没有问题, 那么说问题出在 consumer.commit 这个函数上? 于是我把它替换成了最简单的 setImmediate, 没有问题. 然后又把 consumer.commit 的回调函数 commited 替换成了简单的 -> console.log "ACK", 同样没有任何问题. 但只要是 consumer.commit 搭配 pipe.once "message", consume 就出错.

那么什么情况下会出现爆栈问题? 除非 consumer.commit 不是异步函数!

翻看了 kafka-node 的代码, 找到问题的根源了: consumer.commit 时而是同步时而是异步:

function autoCommit (force, cb) {
  if (arguments.length === 1) {
    cb = force;
    force = false;
  }

  if (this.committing && !force) return cb(null, 'Offset committing');

  this.committing = true;
  setTimeout(function () {
    this.committing = false;
  }.bind(this), this.options.autoCommitIntervalMs);

  var payloads = this.payloads;
  if (this.pausedPayloads) payloads = payloads.concat(this.pausedPayloads);

  var commits = payloads.filter(function (p) { return p.offset !== 0; });
  if (commits.length) {
    this.client.sendOffsetCommitRequest(this.options.groupId, commits, cb);
  } else {
    cb(null, 'Nothing to be committed');
  }
}
Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit;

consumer.commit 其实有个 force 参数, 而官方文档里没写, 估计是处于性能因素考虑.

当我们某次提交成功时, 紧接着在 autoCommitIntervalMs 时间内立即产生另外一个提交, 这时如果 force 为 fasle, 那么 consumer.commit 函数直接变为同步:

return cb(null, 'Offset committing');

如果是这样, 那么问题就明朗了, 我们的代码在某一时刻会变成这样:

consume = (msg) ->
  # process ...
  if (no_error)
    # ack
    consume new_msg
  else
    # other reconsume strategy

pipe.once "message", consume

原来即便手动提交 offset, 连续提交时在内部也仍然会被无视, 而要等待自动提交周期一样长的时间.

为了证实猜测, 我分别做两次测试: 传入 force = true; 将 return cb(null, 'Offset committing'); 改为 return setImmediate(cb, null, 'Offset committing');

两次都成功完成了消费, 只是 force 为 true 时, 由于每个 commit 都要如实上报, 消费速率大打折扣, 确实特别慢.

到这里, 问题基本上已经被解决了, 想想为什么会出现此类问题呢? 似乎回调函数异步与否在大多数场景下都没什么太大影响. 现在已经不是要说谁来背锅的时候了, 我想起当年读过那本 <JavaScript 异步编程> 中作者的一段话, 大概是: 如果你的函数是异步的, 一定要提供个回调函数告诉使用者... 但如果是同步的, 提供这个回调函数往往会产生误导.

这也算是个实战中积累的教训吧.


BTW. 最开始要解决的问题要扔到明天了...