twmb / franz-go

franz-go contains a feature complete, pure Go library for interacting with Kafka from 0.8.0 through 3.6+. Producing, consuming, transacting, administrating, etc.
BSD 3-Clause "New" or "Revised" License
1.61k stars 158 forks source link

Expected behavior when producing to non-existent partition #711

Closed alistairking closed 1 month ago

alistairking commented 2 months ago

Hey again,

We recently had a bug in some code that was doing manual assignment of partitions to messages where it was configured with an incorrect number of partitions and so was producing messages to partitions that did not exist (the topic had 100 partitions and the partitioning code was assigning messages across 1000).

Naively I would have expected the produce to fail or at worst to log an error, but from what I could tell the message was silently dropped (whether by franz or by Kafka I don't know)?

Now, obviously the solution is to use the correct number of partitions (and probably asking Kafka for this number is better than statically configuring it), but I'm curious as to whether this is the expected behavior when one does produce to a partition that does not exist.

twmb commented 2 months ago

It sounds like you were using manual partitioning? The client forever tries to send to partitions that don't exist. Usually, it just outright wont happen that you try to produce to a partition that does not exist. The number of partitions in a topic only goes up (stonks), so if a partition can be produced to via metadata discovery, the only way for it not to be produced to in the future is if you delete a topic and recreate it. Or, if you have a bug and try producing to a partition manually that doesn't exist.

There is an option, UnknownTopicRetries, that causes produces to fail when the entire topic does not exist. This can probably be extended -- though not super easily -- to when partitions keep repeatedly receiving this error. If you want to bite this off, feel free (I can try helping on discord). I may have some time to address this once I finally solve the implementation for the new KIP (that I'm not prioritizing that highly, to be honest).

alistairking commented 2 months ago

Right, that's exactly what happened in our case -- we had a bug in a manual partitioner so it was producing to partitions that didn't (and wouldn't) exist.

Now that I've fixed the bug I'm not super worried about it, but it was a bit surprising that there was no logging whatsoever. Would it be easier (and would it make sense) to emit a warning log after some number of retries producing to a non-existent partition?

twmb commented 2 months ago

There's definitely a log, but only at the debug level.

Looking at the code though, https://github.com/twmb/franz-go/blob/6a58760afaa702521cf2623f4077d034ee2b445e/pkg/kgo/sink.go#L1298-L1305 The unknown check actually is at the partition level (recBuf == per-partition buffer for records).

I think maybe something else is up -- IIRC, the records are never even attempted to be produced because the client knows the partition does not exist. In fact, they should be failed outright, because of this chunk of code: https://github.com/twmb/franz-go/blob/6a58760afaa702521cf2623f4077d034ee2b445e/pkg/kgo/producer.go#L578-L581

Something else is up here. I'll see about reproducing this locally with kfake...

pikrzysztof commented 2 months ago

These guys here https://www.youtube.com/watch?v=paVdXL5vDzg starting at 17:45 ending at 20:50 demonstrate that Java client will synchronously block until the partition gets created. No log message is produced.

twmb commented 1 month ago

Sorry for the delay on getting back to this, but I can't reproduce the initial problem when running the following locally against a topic ("foo") with 10 partitions:

package main

import (
    "context"
    "fmt"

    "github.com/twmb/franz-go/pkg/kgo"
)

func main() {
    cl, _ := kgo.NewClient(
        kgo.DefaultProduceTopic("foo"),
        kgo.RecordPartitioner(kgo.ManualPartitioner()),
    )

    r := &kgo.Record{
        Value:     []byte("foo"),
        Partition: 0,
    }

    {
        r, err := cl.ProduceSync(context.Background(), r).First()
        fmt.Println(r, err)
    }

    r.Partition = 11
    {
        r, err := cl.ProduceSync(context.Background(), r).First()
        fmt.Println(r, err)
    }
}
$ go run main.go
&{[] [102 111 111] [] 2024-05-25 20:14:28.1 -0600 MDT foo 0 {0} 0 2039761367311953381 0 0 context.Background} <nil>
&{[] [102 111 111] [] 2024-05-25 20:14:28.1 -0600 MDT foo 0 {0} 0 0 0 0 context.Background} invalid record partitioning choice of 11 from 10 available

I'm going to close this for now but if you have more details (maybe a slimmed down repro) about how you ran into a record being invisibly dropped, I'm open to investigate further.

alistairking commented 1 month ago

Thanks for looking into this. It turns out that we were not passing an error callback function to our async Produce calls, so the errors were being silently dropped. Sorry for the fire drill!