lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.37k stars 172 forks source link

Lost lots of messages due to async produce #192

Closed XiaochenCui closed 4 years ago

XiaochenCui commented 5 years ago

I recently write a test script which produce 20 million messages to kafka with goka, but there is only about 14 million messages after produce complete. The disk space is enough and no error messages in kafak logs and client logs. The script and kafka is running in docker, and under a subnet.

test scripts:

func GokaAsyncProduce() {
    emitter, err := goka.NewEmitter(
        viper.GetStringSlice("kafkaConfig.brokerUrls"),
        goka.Stream(viper.GetString("kafkaConfig.topic")),
        new(codec.String),
    )
    if err != nil {
        log.Fatalf("error creating emitter: %v", err)
    }

    startTime = time.Now().UnixNano()

    preTime := time.Now().UnixNano()
    preN := 0

    for n := 0; n < count; n++ {
        bs := getPkg()
        _, err = emitter.Emit("", string(bs))
        if err != nil {
            log.Fatalf("error emitting message: %v", err)
        }

        currTime := time.Now().UnixNano()
        if float64(currTime-preTime) > float64(collectInterval)*math.Pow10(9) {
            currN := n - preN
            currSpeed := currN / collectInterval
            fmt.Printf("produce speed: %v pps", currSpeed)

            preTime = currTime
            preN = n

            PrintMemUsage()
            PrintCPUUsage()
        }
    }
    emitter.Finish()

    endTime = time.Now().UnixNano()
}

Count messages using docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka:9092 --topic test --time -1 --offsets 1

db7 commented 5 years ago

That sounds weird. What is the retention time of the topic? It's not log compacted, right?

XiaochenCui commented 5 years ago

Here coms the log retention policy:

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

And log compaction is disabled

I test 20 million messages using samara, and there are exactly 20 million messages after test finish:

docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell  --broker-list kafka:9092 --topic test --time -1 --offsets 1
test:0:20000000

I will test goka soon later and post the result here.

Thank you.

XiaochenCui commented 5 years ago

Oops there are only 11 million messages in the topic.

docker-compose exec kafka kafka-run-class.sh kafka.tools.GetOffsetShell  --broker-list kafka:9092 --topic test --time -1 --offsets 1
test:0:11576072
yanyuzhy commented 5 years ago

@XiaochenCui Could you try multi times and then paste the results? Thus we can get more information about this issue.

frairon commented 5 years ago

@XiaochenCui one error check could also help: the emitter.Emit(...) returns a Promise which will result in an error. Maybe there are errors which you aren't seeing since you're not checking for that. Like this:

prom, err = emitter.Emit("", string(bs))
prom.Then(func(err error){
    if err != nil {
    log.Fatalf("error emitting message (in promise): %v", err)
    }
})
XiaochenCui commented 5 years ago

@frairon

2019/06/21 08:30:50 error emitting message: kafka server: Message was too large, server rejected it to avoid allocation error.
frairon commented 5 years ago

Yep, nice :) So at least one message is too large. Kafka's limit is 1MB per message by default, but I'd recommend using much smaller messages. If your message is <1MB you could try to do an emitter.EmitSync(...), which send messages one-by-one. It's much slower than batching but you could narrow the error down to the batching. If your messages are > 1MB you have to split them up. I think the Kafka documentation does not recommend increasing the max-message-size config.

XiaochenCui commented 5 years ago

@frairon But the size of message is 30 - 1000 bytes, so what's the root problem?

frairon commented 5 years ago

Well, if Kafka says the messages are too big, then I guess they really are too big, ohne way or another. If your single messages really are only 1000bytes max, then it has to do with the internal batching of the kafka producer (which I doubt, but wouldn't know where else to look). So have you tried sending with EmitSync to check if the errors still occur?

XiaochenCui commented 5 years ago

@frairon Not yet, the performance of EmitSync is only about 10 messages per second and far from meeting our expectations

frairon commented 5 years ago

@XiaochenCui yes I understand the performance issue here, but trying that could make sure that it's not sarama's internal batching that causes messages that are too big. Anyway, we have experienced that error a couple of times and it was always a message being too big. Always. So to make sure it's not too big you could log the message size in the error handler. If we can't be sure 100% that the messages aren't too big, it's a waste of time to look somewhere else for the cause of the error.

frairon commented 4 years ago

I'll close that for now, feel free to reopen if it's still an issue