Closed evan-stripe closed 6 years ago
Can one of the admins verify this patch?
Hmm. I'm having a hard time figuring out why the tests keep failing, but it seems like timeouts talking to zookeeper rather than anything that I've changed. Not sure what to do about that.
~Instead of passing Kafka headers as HTTP headers in the produce API I would rather make them a header
parameter.~
Scratch that. It does make sense to pass headers as... well headers.
There is no consume with headers test in service_http_test.go
. Preferably tests in service_http_test.go
and service_grpc_test.go
should be similar.
@evan-stripe overall I like it. Just a few minor and easily fixable comments. Thank you for your contribution! @thrawn01 do you have any comments?
Thanks for the feedback! I think I've incorporated all of your requests into the latest version of the patch, but let me know if I've missed anything.
Two very minor comments. Otherwise I am ready to merge. But before doing that I would like another pair of eyes primarily on the HTTP interface, the GRPC is solid. @thrawn01 please take a look.
Ough, almost forgot, if you could update the README and CHANGELOG that would be great. But let's do that after @thrawn01 gives his go on the changes.
Maybe we should say that headers are expected to be utf-8 strings and if a use case demands binary then it is the clients responsibility to base64 or whatever? Just thinking out loud, I am not against base64 encoding.
I think Sarama may actually be wrong to expose the header name as a []byte
instead of a string
. Both the Java API and the KIP use a string: https://kafka.apache.org/11/javadoc/org/apache/kafka/common/header/Header.html, and Kafka protocol strings are decoded as UTF-8.
So, I think we can definitely get away with using strings in the HTTP publish interface. I'm now wondering whether we should change the GRPC interfaces and the HTTP consumer interface to use strings as well. Thoughts?
Given what you said about the Java implementation, I vote for headers to be strings all the way through.
Alright, updated everything to be strings.
This is really awesome! I tested it out and added support for it in the CLI (See #147)
I ran into some usability issues though that might throw some.
KAFKA_VERSION
in the environment. Which means adding a header will fail unless KAFKA_VERSION=0.11.0.2
or better.RecordHeader.encode()
and RecordHeader.decode()
JSON marshalling of the RecordHeader
struct will always encode the value. This threw me when I was un-marshalling values for the CLI. It's not a big deal, as the Record
struct also has this issue. (See https://github.com/mailgun/kafka-pixy/pull/147/files#diff-535587b8b2c65359a863da6d2191b33dR521)Neither of these are blocking issues, and probably isn't anything we would fix.
Hmm. Given that values are arbitrary byte arrays (not necessarily UTF-8), I'm not entirely sure how to make the types more ergonomic here; I think you have to base64-encode by default.
Totally agreed on the ergonomics of the version number, though - I ran into this when I was originally developing this feature. We hadn't bumped the version setting from the default, and the patch we're running internally didn't have the error message, and it was really challenging to debug. (It'd be super nice if Sarama actually used the ApiVersions API to negotiate the protocol versions, instead of requiring them to be hardcoded.)
There is a version number option that you can put in the config file, though. One option would be to make the error message more verbose; maybe something like "headers are not supported with this version of kafka; update proxies.kafka.version in your config file or set $KAFKA_VERSION", but that's pretty wordy for a Go error message.
Just wanted to comment but pressed the close button by accident, sorry... I told myself again and again do not work until you have your morning coffee...
Can one of the admins verify this patch?
Please rebase and I will merge.
...and now that we agreed on API, please, update the README and CHANGELOG.
Alright, I took a stab at updating the README and the CHANGELOG
@evan-stripe looks like a test needs to be fixed:
service_grpc_test.go:237:
c.Check(grpcStatus.Message(), Equals, "headers are not supported with this version of Kafka")
... obtained string = "headers are not supported with this version of Kafka. Consider changing `kafka.version` (https://github.com/mailgun/kafka-pixy/blob/master/default.yaml#L35)"
... expected string = "headers are not supported with this version of Kafka"
Bah that's what I get for making the error messages better 😛
Fix coming momentarily...
:tada:
Thanks for all the feedback and help!
This adds support for specifying and receiving Kafka record headers (added in Kafka 0.11) to both the HTTP and gRPC interfaces. For gRPC headers are simply added as an additional field to the ProdRq and ConsRs structs - easy. For consuming over HTTP, they're added as an additional field - also easy.
For producing over HTTP, the HTTP server looks for any HTTP header with the prefix "X-Kafka-". It strips that prefix to get the Kafka record header key, and treats the HTTP header value as a base 64 encoded value.
I'm open to feedback on the interfaces here - a few things that I rejected but I could be wrong about:
Record headers are only supported on Kafka version 0.11 or later, so if the
Version
config variable isn't set to something sufficiently new, Sarama will silently ignore headers. It might be desirable to return an error if the cluster configuration doesn't support producing messages with headers, but it doesn't seem like that's currently threaded throughproducer.T
orconsumer.T
, so I'm not sure how to detect that case.