segmentio / kafka-go

Kafka library in Go
MIT License
7.59k stars 789 forks source link

Control the number of connections #304

Open VictorDenisov opened 5 years ago

VictorDenisov commented 5 years ago

Describe the solution you'd like Reader and Writer in this library open the number of connections proportional to the number of partitions of the topic. Also writer allows us to write to only one topic. So if we want to write to another topic we need to create another writer which is going to start another batch of connections of the number of partitions. It puts extra load on conntrack tables in cloud providers. It would be great to come up with an approach for starting only the amount of connections necessary to talk to each broker and no more. Also it would be great to share the set of these connections between writer and readers.

Supporting documentation Please provides links to relevant Kafka protocol docs and/or KIPs.

joaoferrao commented 5 years ago

@VictorDenisov we started writing a kafka intensive backend and have started noticing broken pipe errors such as this:

error writing messages to some-topic (partition 5): write tcp 123.123.123.123:36363->123.123.123.123:9092: write: broken pipe

The broken pipe issue seems to be random and we have had a tough time pin pointing the origin. We are running 3x brokers inside a cloud managed Kubernetes Cluster. Our average microservice runs a single Reader and a single Writer which writes to a single topic with multiple partitions.

Does your issue relate to something similar?

VictorDenisov commented 5 years ago

I don't think it's related. The problem I describe only becomes critical when you are reading from many topics simultaneously. Unlike confluent clients this client starts as many connections per topic as many partitions this topic has.

achille-roussel commented 5 years ago

I'm currently planning to work on improving connection management, but as usual time spent on open-source project needs to be planned around commitments to professional obligations, so things take time.

In the mean time, I'm happy to hear more about what your requirements are, for example how many topic/partitions per process are you dealing with, what are your connection limits, how many producer/consumers are running concurrently, etc...

VictorDenisov commented 5 years ago

In one case we have 6 topics and 24 partitions for each. All together it becomes pretty big number especially given that fact that in addition to active connections this client maintains a lot of connections in time wait state.

For producing we need to be able to send to arbitrary number of topics. We have topics whose names are determined on the fly and we can have up to 1M of those topics.

arjunvofficial commented 5 years ago

Hey, @VictorDenisov i am facing an error while writing data to kafka

There are multiple writers created and it writes on the same topic, concurrently. is there any way to solve this issue ??

achille-roussel commented 5 years ago

Hello @arjunvofficial !

Timeouts may occur for many reasons in distributed systems, could you share more details about your configuration?

arjunvofficial commented 5 years ago

Hi @achille-roussel

We are using dockerized kafka setup ,given below the docker-compose config. `services: zookeeper: image: confluentinc/cp-zookeeper:5.3.0 hostname: zookeeper container_name: zookeeper ports:

the writer config which we are using is given below , ` config := kafka.WriterConfig{

    Brokers:          bootstrapServers,

    Topic:            topic,

    BatchTimeout:     1 * time.Millisecond,

    MaxAttempts:      3,

    QueueCapacity:    1000,

    CompressionCodec: snappy.NewCompressionCodec()}

w := kafka.NewWriter(config)`

we are creating multiple writers (sometimes more than hundreds) with same topic

and each will be concurrently running goroutines . So data will be write into kafka by this respective writers concurrently.

ALTELMA commented 3 years ago

@arjunvofficial @achille-roussel Any updates? I still facing the same issue when use multiple brokers. Got timeout.

achille-roussel commented 2 years ago

Hello @ALTELMA

There is work pending to be done on the kafka.Reader type to reduce the number of open connections, but this should not be an issue with kafka.Writer or kafka.Client use cases as these now rely on the underlying kafka.Transport to manage connection pooling and reuse.

Would you have more details to share about your use case to help inform decisions here?

ALTELMA commented 2 years ago

@achille-roussel Thanks for your feedback. In my case, our config in-app server did not match with target Kafka services. Now our problem is solved and confirm our problem did not come from your awesome package.

Thanks :)

kikyomits commented 2 years ago

Hi this is a bit old issue but I bumped to this while searching the approach to limit the concurrent connections.

Use Case

Our architecture is AWS MSK with AWS Lambda. Our Lambda consumes events from a topic, transform data and publish to another topic. This topic has 10,000s of messages and we are planning to deploy new lambda function. At the initial deployment, we expect this lambda will process all the existing messages in the topic.

Issue

MSK limit new connections at a rate of up to 20 TCP connections per broker per second. And we have more than 20 partitions for the topic. As this library balances the event publish across partitions and open new connection per partition, it's likely to hit the connection limit.

Proposal

Like @VictorDenisov mentioned, it seems good idea to me to have MaxConnections parameter to limit opening new connection. While reading the code it's perhaps achievable by using BatchSize parameter, but it's not intuitive for me and prefer to have more self-explanatory solution. I'm new to this library but love to contribute to it if this proposal looks good to you.

Thanks :)