nats-io / nats-kafka

NATS to Kafka Bridging
Apache License 2.0
131 stars 32 forks source link

NewNATS2KafkaConnector always sends message to the same partition if keyType, keyValue and balancer are not specified. #69

Open cecilwei opened 2 years ago

cecilwei commented 2 years ago

Hi,

We are using this project to bridge NATs messages to Kafka; and we found an issue that if keyType, keyValue and balancer are not specified, it will always produce messages to the same Kafka partition.

And we found that https://github.com/Shopify/sarama/issues/1957 might be related.

func (conn *BridgeConnector) calculateKey(subject string, replyto string) []byte {
    keyType := conn.config.KeyType
    keyValue := conn.config.KeyValue

    if keyType == conf.FixedKey {
        return []byte(keyValue)
    }

    if keyType == conf.SubjectKey {
        return []byte(subject)
    }

    if keyType == conf.ReplyToKey {
        return []byte(replyto)
    }

    if keyType == conf.SubjectRegex {
        r, err := regexp.Compile(keyValue)

        if err != nil {
            conn.bridge.logger.Noticef("invalid regex for %s key value", conn.String())
            return []byte{}
        }

        result := r.FindStringSubmatch(subject)

        if len(result) > 1 {
            return []byte(result[1])
        }

        return []byte{}
    }

    if keyType == conf.ReplyRegex {
        r, err := regexp.Compile(keyValue)

        if err != nil {
            conn.bridge.logger.Noticef("invalid regex for %s key value", conn.String())
            return []byte{}
        }

        result := r.FindStringSubmatch(replyto)

        if len(result) > 1 {
            return []byte(result[1])
        }

        return []byte{}
    }

    return []byte{} // empty key by default
}

In Connector.calculateKey, if keyType is not assigned, an empty array is returned and this will result in NewHashPartitioner fails to return a random partition.

Please help to see if nil shall be returned if keyType is not specified.

Thanks