Open NeoZephyr opened 2 years ago
func (k *kafka) start() error { ps, _ := k.client.Partitions(k.opt.topic) for _, p := range ps { pom := k.offsetManager.ManagePartition(k.opt.topic, p) offset := pom.NextOffset() if offset == -1 { offset = sarama.OffsetOldest } c := k.baseConsumer.ConsumePartition(k.opt.topic, p, offset) consumer := newConsumer(k, c, pom) k.consumers = append(k.consumers, consumer) k.wg.Add(1) go consumer.start() } return nil } func (k *kafka) Close() error { for _, c := range k.consumers { if err := c.close(); err != nil { log.Warn("close consumer error: %s", err) } } k.wg.Wait() return nil } func (c *consumer) close() error { c.closeCh <- struct{}{} c.pom.Close() return c.consumer.Close() } func (c *consumer) start() { defer c.wg.Done() var err error var value []byte for { select { case msg := <-c.consumer.Messages(): c.pom.MarkOffset(msg.Offset+1, "") if err = json.Unmarshal(msg.Value, dao); err != nil { continue } case <-c.closeCh: return } } }