optiopay / kafka

Go driver for Kafka
https://godoc.org/github.com/optiopay/kafka
MIT License
216 stars 57 forks source link

Using proto.RequiredAcksNone causes panic #47

Closed keep94 closed 8 years ago

keep94 commented 8 years ago

I have written a very simple program to send two messages to a KAFKA cluster. The program queries the broker for the partition count and uses that count to create a Distributed Producer to distribute the load across the partitions.

The program works fine if I use conf.RequiredAcks = proto.RequiredAcksLocal. But I am willing to give up some durability guarantees to lower latency. Therefore, I don't want to wait for any acks.

When I change my program to use proto.RequiredAcksNone, it panics like this:

goroutine 1 [running]:
github.com/optiopay/kafka.(*producer).produce(0xc820040380, 0x5f2f20, 0xb, 0x0, 0xc82000b250, 0x2, 0x2, 0x0, 0x0, 0x0)
    /Users/me/mygo/src/github.com/optiopay/kafka/broker.go:816 +0x128d
github.com/optiopay/kafka.(*producer).Produce(0xc820040380, 0x5f2f20, 0xb, 0x0, 0xc82000b250, 0x2, 0x2, 0x0, 0x0, 0x0)
    /Users/me/mygo/src/github.com/optiopay/kafka/broker.go:742 +0xcb
github.com/optiopay/kafka.(*roundRobinProducer).Distribute(0xc82000e940, 0x5f2f20, 0xb, 0xc82000b250, 0x2, 0x2, 0x1dcd6500, 0x0, 0x0)
    /Users/me/mygo/src/github.com/optiopay/kafka/distributing_producer.go:85 +0xf7
main.main()
    /Users/me/mygo/src/github.com/keep94/kafka/kafka.go:39 +0xabd

Here is my go program:

package main

import (
        "fmt"
        "github.com/optiopay/kafka"
        "github.com/optiopay/kafka/proto"
        "log"
)

func main() {
        addresses := []string{
                "192.168.1.1:9092",
                "192.168.1.2:9092",
                "192.168.1.3:9092",
                "192.168.1.4:9092",
                "192.168.1.5:9092",
        }
        broker, err := kafka.Dial(addresses, kafka.NewBrokerConf("foo"))
        if err != nil {
                log.Fatal(err)
        }
        defer broker.Close()
        var count int32
        count, err = broker.PartitionCount("metricTopic")
        if err != nil {
                log.Fatal(err)
        }
        fmt.Println("count = ", count)
        conf := kafka.NewProducerConf()
        // Using this line works
        conf.RequiredAcks = proto.RequiredAcksLocal
       // Using this line causes dist.Distribute below to panic.
        conf.RequiredAcks = proto.RequiredAcksNone

        producer := broker.Producer(conf)
        dist := kafka.NewRoundRobinProducer(producer, count)
        messages := []*proto.Message{
                {Value: []byte("Whos on first?")},
                {Value: []byte("Whos on second?")},
        }
        if _, err := dist.Distribute("metricTopic", messages...); err != nil {
                log.Fatal(err)
        }
}
husio commented 8 years ago

@keep94 sadly, we never used NoACK option and looks like it was not fully implemented. Please check again, your code should work now. Remember that without ack you get no offset response and none of produced message attributes are updated.