knative-extensions / eventing-kafka

Kafka integrations with Knative Eventing.
Apache License 2.0
77 stars 82 forks source link

KafkaSource Adapter Error in Knative Eventing - Claims Update Timeout and Invalid Length Decoding #1347

Closed B3ns44d closed 1 year ago

B3ns44d commented 1 year ago

Describe the bug Hello,

I've been experiencing issues with the KafkaSource adapter in my Knative Eventing setup. The error messages indicate issues with claims update timeouts and decoding packet lengths. below are the relevant details:

Error Messages from KafkaSource Adapter Logs:

Knative release version:

for the claims update error:

file: consumer/consumer_handler.go

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *SaramaConsumerHandler) Setup(session sarama.ConsumerGroupSession) error {
    consumer.logger.Info("setting up handler")
    consumer.lifecycleListener.Setup(session)
    return nil
}

func (a *Adapter) Setup(sess sarama.ConsumerGroupSession) {
    if a.controlServer != nil {
        if err := a.controlServer.SendAndWaitForAck(kafkasourcecontrol.NotifySetupClaimsOpCode, kafkasourcecontrol.Claims(sess.Claims())); err != nil {
            a.logger.Warnf("Cannot send the claims update: %v", err)
        }
    }

    // Preemptively initialize consumer group offsets to be able to mark the source as ready
    // as soon as possible.
    if err := a.InitOffsets(sess); err != nil {
        a.logger.Warnf("Cannot initialized consumer group offsets: %v", err)
    }
}

code from the knative control-protocol:

file: pkg/service/service.go

func (c *service) SendAndWaitForAck(opcode ctrl.OpCode, payload encoding.BinaryMarshaler) error {
    b, err := payload.MarshalBinary()
    if err != nil {
        return err
    }
    return c.sendBinaryAndWaitForAck(opcode, b)
}

func (c *service) sendBinaryAndWaitForAck(opcode ctrl.OpCode, payload []byte) error {
    if opcode == ctrl.AckOpCode {
        return fmt.Errorf("you cannot send an ack manually")
    }
    msg := ctrl.NewOutboundMessage(uint8(opcode), payload)

    logging.FromContext(c.ctx).Debugf("Going to send message with opcode %d and uuid %s", msg.OpCode(), msg.UUID().String())

    // Register the ack between the waiting acks
    ackCh := make(chan interface{}, 1)
    c.waitingAcksMutex.Lock()
    c.waitingAcks[msg.UUID()] = ackCh
    c.waitingAcksMutex.Unlock()

    defer func() {
        c.waitingAcksMutex.Lock()
        delete(c.waitingAcks, msg.UUID())
        c.waitingAcksMutex.Unlock()
    }()

    c.connection.OutboundMessages() <- &msg

    select {
    case <-ackCh:
        return nil
    case <-c.ctx.Done():
        logging.FromContext(c.ctx).Warnf("Dropping message because context cancelled: %s", msg.UUID().String())
        return c.ctx.Err()
    case <-time.After(controlServiceSendTimeout):
        logging.FromContext(c.ctx).Debugf("Timeout waiting for the ack: %s", msg.UUID().String())
        return fmt.Errorf("timeout exceeded for outgoing message: %s", msg.UUID().String())
    }
}

I'm seeking assistance in understanding the root cause of these errors and how to rectify them. Any insights, suggestions, or guidance would be greatly appreciated.

Thank you for your support!

github-actions[bot] commented 1 year ago

This issue is stale because it has been open for 90 days with no activity. It will automatically close after 30 more days of inactivity. Reopen the issue with /reopen. Mark the issue as fresh by adding the comment /remove-lifecycle stale.