Open banzhihang opened 3 years ago
// Consume messages, watch signals func (j *Job) Consume() { for { select { case err := <-j.consumer.Errors(): log.Errorf("consumer error(%v)", err) case n := <-j.consumer.Notifications(): log.Infof("consumer rebalanced(%v)", n) case msg, ok := <-j.consumer.Messages(): if !ok { return } j.consumer.MarkOffset(msg, "") // process push message pushMsg := new(pb.PushMsg) if err := proto.Unmarshal(msg.Value, pushMsg); err != nil { log.Errorf("proto.Unmarshal(%v) error(%v)", msg, err) continue } if err := j.push(context.Background(), pushMsg); err != nil { log.Errorf("j.push(%v) error(%v)", pushMsg, err) } log.Infof("consume: %s/%d/%d\t%s\t%+v", msg.Topic, msg.Partition, msg.Offset, msg.Key, pushMsg) } } }
这里job阻塞消费kafka会不会导致消费速度太慢,出现消息堆积呢
实践中会存在的,需要保证发得够快
这里job阻塞消费kafka会不会导致消费速度太慢,出现消息堆积呢