segmentio / kafka-go

Kafka library in Go
MIT License
7.51k stars 776 forks source link

CreateTopics returns io.EOF #118

Closed briansorahan closed 5 years ago

briansorahan commented 6 years ago

Package version: 48c37f796910d8154479aaa04ade5843ae4f55d0

Example code: https://gist.github.com/briansorahan/0e6fac07a3076a83e36d80ae55ec48d3

I'm testing CreateTopics on an existing topic, since the idempotent behavior would be very nice to have. Is it expected to get io.EOF?

got io.EOF
2018/09/24 17:11:45 creating topics: EOF
exit status 1
achille-roussel commented 6 years ago

This definitely sounds like it's breaking what the documentation says the behavior should be.

The fix doesn't seem to hard (checking of io.EOF), would you be able to contribute a fix for this?

briansorahan commented 6 years ago

@achille-roussel I would love to contribute a patch. Should we just check for io.EOF here and return nil?

briansorahan commented 6 years ago

If yes ^ this seems like a really small change, and I'd like to contribute a test too.

Here are the test failures on my fork:

# github.com/briansorahan/kafka-go_test
./compression_test.go:35:11: cannot assign int to r1 (type kafka.Message) in multiple assignment
./compression_test.go:35:21: not enough arguments in call to m.CompressionCodec.Encode
        have ()
        want ([]byte, []byte)
./compression_test.go:41:11: cannot assign int to r2 (type kafka.Message) in multiple assignment
./compression_test.go:41:22: not enough arguments in call to r1.CompressionCodec.Decode
        have ()
        want ([]byte, []byte)
./compression_test.go:127:24: not enough arguments in call to msg.CompressionCodec.Encode
        have ()
        want ([]byte, []byte)
./compression_test.go:132:26: m1.Value undefined (type int has no field or method Value)
./compression_test.go:134:14: m1.Decode undefined (type int has no field or method Decode)
FAIL    github.com/briansorahan/kafka-go [build failed]

With a little guidance on how to fix them, I'd be happy to do that too.

briansorahan commented 6 years ago

After getting past these errors ^ I discovered that some of the tests also to have a kafka broker running locally.

At least that's how I interpret this:

--- FAIL: TestDialer (0.00s)
    --- FAIL: TestDialer/looking_up_partitions_returns_the_list_of_available_partitions_for_a_topic (0.01s)
        reader_test.go:262: bad conn
        dialer_test.go:55: dial tcp 127.0.0.1:9092: connect: connection refused

I would suggest putting a build tag on these, something like

// +build integration

so that users can run go test without a lot of fuss.

wdyt?

abraithwaite commented 6 years ago

I would be careful about adding io.EOF checks @achille-roussel. Gotta remember that kafka proactively kills the connection on protocol errors, this could be one of those.

There are 4 versions of create topics in the kafka protocol:

https://kafka.apache.org/protocol#The_Messages_CreateTopics

I'm not sure which we're using in kafka-go, but we'd need to rule that out.

@Pryz do we have an exact version of kafka that kafka-go works with for sure? We should put that in the readme.

Pryz commented 5 years ago

@abraithwaite It's more about version of the message than version of Kafka. kafka-go will work just fine with all the kafka versions as long as you use message v0 and v1 formats. See : https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol

briansorahan commented 5 years ago

Couldn't we just document that the way to acheive idempotency is to check for TopicAlreadyExists? i.e. Can we just get rid of the switch?

briansorahan commented 5 years ago

In fact, when I set up my little test I was expecting to see something like this ^

frodopwns commented 5 years ago

I too am running into this issue. In my case, I get the EOF error when the topic does not exist and it does not get created.

Not sure if my request is wrong or if the client needs an update to be in line with new Kafka api.

Using kafka_2.11-v1.1.1-rc1

achille-roussel commented 5 years ago

@briansorahan I'm not sure how different it would be to return the "TopciAlreadyExists" error rather than checking for err == nil, do you have an example of how this would help you?

@abraithwaite you're making a good point that EOF may mean that the topic wasn't created as well... but in that case any future attempt to read or write to the topic will fail.

briansorahan commented 5 years ago

@achille-roussel I'm suggesting that we avoid hiding TopicAlreadyExists from callers since they can just detect it and make their own decision. As long as we document which errors the method can return, and how to interpret each one, then it's up to the caller to implement their own policy.

achille-roussel commented 5 years ago

I’m a bit concerned about breaking programs that depend on the current behavior, but at the same time it’s true that we have no way of detecting if a topic already existed right now, so it seems like this is something that needs fixing.

Would you be able to send a pull request to make the change?

briansorahan commented 5 years ago

I can submit a PR with that change, but I'd like to document io.EOF and what it means.

achille-roussel commented 5 years ago

@briansorahan do you still want to make this happen? I think the idea of not making CreateTopics idempotent is probably an improvement.

Regarding io.EOF, it seems always unexpected to get this error on CreateTopics, so we may want to wrap it with a more descriptive error message (but I'm not sure under what conditions would Kafka close the connection in this case).

briansorahan commented 5 years ago

Honestly I've lost interest in this since confluent-kafka-go now supports creating topics

stevevls commented 5 years ago

Echoing @abraithwaite, this sounds very much a case of the broker not understanding the version of the request and closing the connection. It looks like we were sending v2 of the CreateTopicsRequest up until #81. Now we're sending v0, which will have wider support. I'd expect this to no longer be an issue.

stevevls commented 5 years ago

Closing this issue. Please re-open if you see this behavior on Kafka >= 0.10.0!