mesos / kafka

Apache Kafka on Apache Mesos
Apache License 2.0
414 stars 140 forks source link

Support Multiple Kafka clusters #93

Closed SEJeff closed 8 years ago

SEJeff commented 9 years ago

So one approach I guess would be to run multiple versions of this frameworks against different znodes in zk (for HA mode), but I'd really like to be able to manage multiple kafka clusters with this.

We have several teams internally that use kafka, and have totally different configuration requirements. Some care more about replication and durability, some care more about performance, some have a bazillion topics and stripe over them with little to no replication.

It would be really nice to run a single instance of this framework using all of them.

joestein commented 9 years ago

You can run a single instance of the scheduler and then set each broker configuration for whatever values you want for properties and constraints, etc.

joestein commented 9 years ago

Maybe would could have some type of 'template' for types of clusters. I think maybe if we understood the use case and why you couldn't just use multiple schedulers it might help formulate what the feature/enhancement could look like.

SEJeff commented 9 years ago

@joestein Thanks, so I have multiple user groups internally, who have data that between some of the groups could be considered "proprietary" and privileged. For simplicity and auditing reasons, we'd want to have different kafka clusters running under our mesos cluster. My job (what a sucker) is more or less to make it easy for our users to utilize the mesos environment. This is a really really dumbed down PaaS more or less.

This framework, and several others, will be wrapped by an internal web service we wrote that essentially lets users choose what they want clicky/clicky or an API, that ties into our own LDAP/Kerberos ala:

In my envisioned use case, I'd have aurora start a copy of this framework, and preferably only a single one. Then my application could use the API to create "kafka as a service" as a supported, replicated, and automatically monitored service our Linux team could provide.

Alternatively, we could run multiple versions of this framework, but instead of the seemingly wasted duplication and extra management of that, could an extra query argument be added to the HTTP api call such as cluster_id=1 for kafka cluster 1, containing N nodes, or cluster_id=2, for kafka cluster 2? Then you can optionally specify a cluster in the cli, which always defaults to 1, the first cluster when none are specified.

That way you could keep backwards compatibility, and open up this framework to a whole new set of use cases. What do you think? Again, I could run multiple versions of this framework via Aurora, but that just seems suboptimal and a bit hacky to me. If possible, I'd prefer to not do that.

Note that I'll likely be building some preconfigured "templates" which will ultimately drive API calls to this framework in my wrapper application regardless. Supporting them natively would be a really slick way to go about things in this framework.

JohnOmernik commented 9 years ago

A big +1 here. As I just posted in the IP Per Container issue, multi tenancy and the ability to make it so users can provision their own kafka cluster without impacting production level clusters would be huge and really spawn innovation. Basically, people could pull off a shelf a cluster and make it work for them, no IT involvement... users (in development roles) can be granted the rights to use certain amount of resources and this is one of things they could create with those resources.

emdem commented 8 years ago

Huge +1

JohnOmernik commented 8 years ago

I was looking into this, and tried to get two clusters running and it appears we need a way to set the chroot for the ZK for each cluster, other than that, having multiple kafka clusters shouldn't be an issue.

This is what I did:

First an assumption: I am using MapR FS and have nfs mounted on every node, that affords me some flexibility in trying this stuff out as I can set broker data locations there, I can pull the tgz from file:// etc. It SHOULDN'T matter for this, but I am not an expert and wanted full disclosure.

I created a tgz for the scheduler for Marathon to pull down. It's basically the jar, the kafka-mesos.sh, and the tgz of the version of Kafka I will be using.

I started that in Marathon with the marathon file for prod (below). Basically I pull the tgz, but my kafka-mesos.properties file is in a nfs location (it's not in the tgz, I could add it too I guess, either way).

As you can see from my "prod" kafka-mesos.properties, I am using kafkaprod as (what I thought) was my ZK root. Note: This all works. I create brokers, I set the data location options with:

./kafka-mesos.sh broker update 0 --options log.dirs=/mapr/brewpot/data/kafka/kafkaprod/broker0/ ./kafka-mesos.sh broker update 1 --options log.dirs=/mapr/brewpot/data/kafka/kafkaprod/broker1/ ./kafka-mesos.sh broker update 2 --options log.dirs=/mapr/brewpot/data/kafka/kafkaprod/broker2/ ./kafka-mesos.sh broker update 3 --options log.dirs=/mapr/brewpot/data/kafka/kafkaprod/broker3/

Using the convention of /mapr/brewpot/data/kafka/%frameworkname%/%brokerid% This too all works.

So then I mimic all for kafkadev, changing it whereever it has kafkaprod to be kafkadev (including the zk: which I thought would address this).

However, when I try to run the kafka dev, and update the location etc, then start, I get the error below. I tried setting the zk.connect and zookeeper.connect (both failed with invalid options, zk.connect is on the kafka docs, zookeeper.connect is what is specified in the start command).

In the list of settings on the broker start, I see nothing about /brokers so that must be hard coded somewhere, but I can't seem to find it. ideally, I'd like to take /brokers and put it under my zk:/kafkaprod or zk:/kafkadev roots specified in the kafka-mesos.properties, but I am not sure how to do that. That seems to be the only thing keeping me having two clusters running on the same mesos cluster.

To validate that last statement, on my dev cluster, I prepended a 1 to each broker id, ie. instead of broker 0, 1, 2, I have brokers 10, 11, 12. When I did that, the brokers started with no errors and ran fine side by side with my "production" cluster. Thus, I do believe the only conflict between two clusters is the /brokers node which I am not sure how to adjust. Also of note: I am using Kafka 9.0 here.

Caveat: at this point if I look to /brokers, all brokers appears there in Zookeeper, thus if a producer or consumer where to query that node, they could get brokers from either cluster, obviously not ideal, and why that /brokers should be moved under /kafkaprod/brokers or /kafkadev/brokers.

On Start Error ( when starting brokers in dev, with prod already running)

[2016-01-06 08:04:37,426] INFO Creating /brokers/ids/2 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2016-01-06 08:04:37,438] INFO Result of znode creation is: NODEEXISTS (kafka.utils.ZKCheckedEphemeral) [2016-01-06 08:04:37,439] FATAL [Kafka Server 2], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/2. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering. at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:295) at kafka.utils.ZkUtils.registerBrokerInZk(ZkUtils.scala:281) at kafka.server.KafkaHealthcheck.register(KafkaHealthcheck.scala:64) at kafka.server.KafkaHealthcheck.startup(KafkaHealthcheck.scala:45) at kafka.server.KafkaServer.startup(KafkaServer.scala:231) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at ly.stealth.mesos.kafka.KafkaServer.start(BrokerServer.scala:51) at ly.stealth.mesos.kafka.Executor$.ly$stealth$mesos$kafka$Executor$$runBroker0$1(Executor.scala:76) at ly.stealth.mesos.kafka.Executor$$anon$1.run(Executor.scala:99)

kafka-mesos.properties for prod:

debug=false

storage=zk:/kafkaprod

master=zk://hadoopmapr4:5181,hadoopmapr5:5181,hadoopmapr6:5181/mesosha

zk=hadoopmapr4:5181,hadoopmapr5:5181,hadoopmapr6:5181

api=http://kafkaprod.marathonprod.mesos:7000

Marathon file for prod { "id": "kafkaprod", "instances": 1, "cmd": "cd kafka && ./kafka-mesos.sh scheduler /mapr/brewpot/mesos/prod/mesos-kafka/kafkaprod/kafka-mesos.properties", "cpus": 1, "mem": 256, "labels": { "PRODUCTION_READY":"True", "ZETAENV":"Prod", "CONTAINERIZER":"Mesos" }, "uris": ["file:///mapr/brewpot/mesos/prod/mesos-kafka/kafkaprod/kafka-mesos-0.9.4.0.tgz"] }

JohnOmernik commented 8 years ago

Ok, I missed something in the docs (actually I needed to look at another issue) basically, I needed to set the chroot path with:

zk=hadoopmapr4:5181,hadoopmapr5:5181,hadoopmapr6:5181/kafkadev

I had incorrectly assumed that

storage=zk:/kafkadev

set that chroot.

Now I storage back to

storage=zk:/kafka-mesos

and

under /kafkadev is ./kafka-mesos and ./brokers

So I am propery CHrooted

At this point, I think I have two separate kafka clusters running on a single mesos cluster with no issues, not sure if that warrants closing this or not, but it's working nicely.

SEJeff commented 8 years ago

It doesn't, and is the obvious thing, which I also did. Thanks for the excellent detail however. I was just hoping to use this framework to provide "kafka as a service" instead of having to spin up N copies of it. That is very similar to what I ultimately did, but with Apache Aurora instead.

JohnOmernik commented 8 years ago

Well, my goal was to keep things separate. The reasoning being if I have protected data, or protected roles on my Mesos cluster. This partitioning allowed me to keep things separate without changing how kafka operates. Basically, by using mesos-kafka, I can provide kafka as a service, to every group that wants one, I can provide that to them easily. I just needed work through the zk stuff.

SEJeff commented 8 years ago

Closing as I just noticed a dcos release was released which pretty much does what I ended up doing, which is running another framework instance via marathon.