SOHU-Co / kafka-node

Node.js client for Apache Kafka 0.8 and later.
MIT License
2.66k stars 628 forks source link

ConsumerGroup Noting to be commited #1463

Open SuperZhangAo opened 2 years ago

SuperZhangAo commented 2 years ago
fillConsumerGroup.on('message', async function(message) {
  try {
    await ctx.service.consumerTest.start_fill(JSON.parse(message.value));
    fillConsumerGroup.commit(true, (err, data) => {
      logger.error('commit:', err, data);
    });
  } catch (error) {
    logger.error('ERROR: [GetMessage] ', message, error);
  }
});

const fillOptions = {
  kafkaHost: config.kafkaHost, // connect directly to kafka broker (instantiates a KafkaClient)
  batch: undefined, // put client batch settings if you need them
  ssl: false, // optional (defaults to false) or tls options hash
  id: `consumer-fill-${uuid}`,
  groupId: 'ExampleFillGroup',
  sessionTimeout: 15000,
  autoCommit: false,
  // An array of partition assignment protocols ordered by preference.
  // 'roundrobin' or 'range' string for built ins (see below to pass in custom assignment protocol)
  protocol: [ 'roundrobin' ],
  encoding: 'utf8', // default is utf8, use 'buffer' for binary data

  // Offsets to use for new groups other options could be 'earliest' or 'none' (none will emit an error if no offsets were saved)
  // equivalent to Java client's auto.offset.reset
  fromOffset: 'latest', // default
  commitOffsetsOnFirstJoin: true, // on the very first time this consumer group subscribes to a topic, record the offset returned in fromOffset (latest/earliest)
  // how to recover from OutOfRangeOffset error (where save offset is past server retention) accepts same value as fromOffset
  outOfRangeOffset: 'earliest', // default
  fetchMaxBytes: 16777216,
  // Callback to allow consumers with autoCommit false a chance to commit before a rebalance finishes
  // isAlreadyMember will be false on the first connection, and true on rebalances triggered after that
  onRebalance: (isAlreadyMember, callback) => {
    callback();
  }, // or null
};

Why Noting to be commited?