nats-io / nats-kafka

NATS to Kafka Bridging
Apache License 2.0
131 stars 32 forks source link

Batching of messages happening when connecting to Event Hub Kafka #21

Closed lmtyler closed 3 years ago

lmtyler commented 3 years ago

When connecting to the kafka api for Azure Event Hub messages a single message is buffered until a second message is sent to a partition during consumption. Have proving this is not a message size issue, and that it does not occur with sarama or librdkafka.

producer example

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "log"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
    w := kafka.NewWriter(kafka.WriterConfig{
        Brokers: []string{"your-eventhub.servicebus.windows.net:9093"},
        Topic:   "your-topic",
        Dialer: &kafka.Dialer{
            Timeout:   5 * time.Second,
            DualStack: true,
            TLS: &tls.Config{
                InsecureSkipVerify: true,
            },
            SASLMechanism: plain.Mechanism{
                Username: "$ConnectionString",
                Password: "your-password",
            },
        },
        Balancer:  &kafka.Hash{},
        BatchSize: 1,
        Async: false,
    })

    for i := 0; i < 10; i++ {
        msg := fmt.Sprintf("message text %d %s", i, time.Now().Format(time.RFC3339Nano))
        log.Printf("msg: %s\n", msg)
        err := w.WriteMessages(context.Background(), kafka.Message{
            Value: []byte(msg),
        })
        if err != nil {
            log.Fatal("failed to write messages:", err)
        }
        time.Sleep(5 * time.Second)
    }

    if err := w.Close(); err != nil {
        log.Fatal("failed to close writer:", err)
    }
}

Consumer sample

package main

import (
    "context"
    "crypto/tls"
    "fmt"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)

func main() {
    for i := 0; i < 5; i++ {
        go func(p int) {
            log.Printf("starting thread for %d\n", p)
            r := kafka.NewReader(kafka.ReaderConfig{
                Brokers:   []string{"your-eventhub.servicebus.windows.net:9093"},
                // GroupID:   "$Default",
                Partition: p,
                Topic:     "your-topic",
                Dialer: &kafka.Dialer{
                    Timeout:   5 * time.Second,
                    DualStack: true,
                    TLS: &tls.Config{
                        InsecureSkipVerify: true,
                    },
                    SASLMechanism: plain.Mechanism{
                        Username: "$ConnectionString",
                        Password: "your-password",
                    },
                },
                MinBytes:      1,
                MaxBytes:      10e5,
            })

            for true {
                m, err := r.ReadMessage(context.Background())
                if err != nil {
                    break
                }
                fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
            }

            if err := r.Close(); err != nil {
                log.Fatal("failed to close reader:", err)
            }

        }(i)
    }

    waitForExit()
}

func waitForExit() {
    ch := make(chan os.Signal)
    signal.Notify(ch, syscall.SIGINT)
    <-ch
    os.Exit(0)
}
ColinSullivan1 commented 3 years ago

Thanks for raising this issue @lmtyler , we'll take a look.

ColinSullivan1 commented 3 years ago

Could be related to this: https://github.com/segmentio/kafka-go/issues/384

lmtyler commented 3 years ago

The more I work with this kafka connector, I think the rebalancer has an issue as well. And again it might be a specific issue with MS Kafka that is causing it issues. Since this has to be reworked for JetStream, might I suggest converting to https://github.com/Shopify/sarama

lmtyler commented 3 years ago

https://github.com/segmentio/kafka-go/blob/master/balancer.go#L116-L166 This to me says that kafka-go took ideas and logic from sarama

variadico commented 3 years ago

Closed since we've swapped the libraries now.