lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.36k stars 173 forks source link

Examples `simplest` and `clicks` panic with "non-positive interval for NewTicker" #258

Closed pofl closed 4 years ago

pofl commented 4 years ago

Simplest and clicks are the only examples I tried so I can't speak for the others.

I copied the Makefile from ./examples and ran make start. I copied the main.go from the examples and ran go run .

Output for "simplest" is

message emitted
2020/07/06 14:36:32 Processor [example-group]: starting
2020/07/06 14:36:32 Processor [example-group]: creating consumer
2020/07/06 14:36:32 Processor [example-group]: creating producer
2020/07/06 14:36:32 Processor: rebalancing: map[]
2020/07/06 14:36:32 Processor: dispatcher started
panic: non-positive interval for NewTicker

goroutine 118 [running]:
time.NewTicker(0x0, 0x0)
        /home/linuxbrew/.linuxbrew/Cellar/go/1.14.4/libexec/src/time/tick.go:23 +0x147
github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc0002e0000, 0xc000328ba0)
        /home/pofl/Documents/go/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/consumer.go:452 +0x5a
github.com/bsm/sarama-cluster.(*loopTomb).Go.func1(0xc00030ace0, 0xc0003108f0)
        /home/pofl/Documents/go/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/util.go:73 +0x7b
created by github.com/bsm/sarama-cluster.(*loopTomb).Go
        /home/pofl/Documents/go/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/util.go:69 +0x66
exit status 2

Output for "clicks" is

2020/07/06 14:25:13 Table mini-group-table has 10 partitions
2020/07/06 14:25:13 Processor [mini-group]: starting
2020/07/06 14:25:13 Processor [mini-group]: creating consumer
2020/07/06 14:25:13 Processor [mini-group]: creating producer
2020/07/06 14:25:13 Processor: rebalancing: map[]
2020/07/06 14:25:13 Processor: dispatcher started
View opened at http://localhost:9095/
2020/07/06 14:25:13 view [mini-group-table]: starting
2020/07/06 14:25:13 view [mini-group-table]: partition 9 started
2020/07/06 14:25:13 view [mini-group-table]: partition 3 started
2020/07/06 14:25:13 view [mini-group-table]: partition 5 started
2020/07/06 14:25:13 view [mini-group-table]: partition 6 started
2020/07/06 14:25:13 view [mini-group-table]: partition 7 started
2020/07/06 14:25:13 view [mini-group-table]: partition 4 started
2020/07/06 14:25:13 view [mini-group-table]: partition 8 started
2020/07/06 14:25:13 view [mini-group-table]: partition 1 started
2020/07/06 14:25:13 view [mini-group-table]: partition 0 started
2020/07/06 14:25:13 view [mini-group-table]: partition 2 started
2020/07/06 14:25:13 Processor: dispatcher stopped
panic: non-positive interval for NewTicker

goroutine 243 [running]:
time.NewTicker(0x0, 0x0)
        /home/linuxbrew/.linuxbrew/Cellar/go/1.14.4/libexec/src/time/tick.go:23 +0x147
github.com/bsm/sarama-cluster.(*Consumer).cmLoop(0xc0001da000, 0xc000182ba0)
        /home/pofl/Documents/go/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/consumer.go:452 +0x5a
github.com/bsm/sarama-cluster.(*loopTomb).Go.func1(0xc0000ac9e0, 0xc005596250)
        /home/pofl/Documents/go/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/util.go:73 +0x7b
created by github.com/bsm/sarama-cluster.(*loopTomb).Go
        /home/pofl/Documents/go/pkg/mod/github.com/bsm/sarama-cluster@v2.1.15+incompatible/util.go:69 +0x66
exit status 2

The problem seems to be with the sarama-cluster config. Panic occurs here https://github.com/bsm/sarama-cluster/blob/master/consumer.go#L452

sarama-cluster is deprecated btw.

R053NR07 commented 4 years ago

Hey pofl, we are currently working on upgrading goka. The NewTicker problem is triggered by a invalid sarama version. You can use an older one or upgrade goka. We will release the upgraded goka version today (or in the following days) which will fix this issue. In the meantime you can test the new release by using the "v0.9.0-beta3" release.

You can also have a look on this issue: https://github.com/lovoo/goka/issues/255

pofl commented 4 years ago

Damn, sorry, it didn't occur to me that I could just browse the issues to see if this problem was reported before. Shame on me.

frairon commented 4 years ago

No worries @pofl :). Anyway, the new version is released now, so give it a try and let us know if you have any issues.

pofl commented 4 years ago

The issues are fixed.

I'm using goka on the Kafka side to build a rather simple forwarding microservice which forwards events from Kafka to Google Pubsub. I chose goka because I had heard that you guys do some background magic around commiting messages. Using goka I don't need to commit messages myself. So I was wondering if you could tell me, or better, write a few lines of docs about the fault-tolerance magic you do and how much of an exactly-once guarantee you might be able to provide.

Also I am missing a little bit of explanation about what kind of processor graph I'm building. What are nodes and edges in that graph abstraction and what does it do?

These two things are the most glaring questions I have so far.

frairon commented 4 years ago

Hmm, I'm not sure what exactly you mean with "background magic" befor commiting. All that goka does is wait for all operations in the message-callback to be successful (like storing the state, emitting to output topics), before it commits the message upstream. If your processor crashes in the middle, you'll receive the message again next time. So goka provides at-least-once semantics, not more. That's described also here.

For the description of Edges you're right, we could add a little more to the docs for sure. In the meantime, just try out the examples, it's less complicated than you think :)

pofl commented 4 years ago

I mean e.g. in a case where for example offset 42 and 43 are processes in parallel but 42 fails. Is 43 committed? Will 42 get reprocessed? AFAIK if you only use Kafka features to track processing 42 would not get reprocessed if 43 is already committed. So do you have your own persistent process tracking mechanism and if yes how, on a high level, does it work?

db7 commented 4 years ago

@pofl I assume offsets 42 and 43 refer to the same key. If they are different keys, there are no guarantees. If they are the same key, 42 and 43 are not processed in parallel because there is a single goroutine in the processor group that will process all messages of a key, sequentially.

But 42's side effects (creating updates, emitting messages) may still be uncommitted before the goroutine starts processing subsequent offsets. The commit of 42 is only performed when all its side-effects are committed. And 43 should not be committed if 42 hasn't been committed yet. At least this was the behavior before the refactoring. I guess the new implementation also follows this strategy.

db7 commented 4 years ago

Also, there is no exactly-once guarantee in Goka. For example, if processing offset 42 generates messages A and B, and emitting of B fails, then A and B will be emitted again when 42 is reprocessed. So you'll see A twice downstream. Goka will give you at-least-once. If you need exactly-once, you should look into ksql or kstreams. However, usually one doesn't need exactly-once and it's sufficient to make the processing of events idempotent.

pofl commented 4 years ago

Thank you for your answers. I would suggest copy pasting this conversation to some wiki page or something, so the next person with this question won't have to ask 😊 Keep up the great work!