lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.35k stars 175 forks source link

bug error setting up for partition #415

Closed kumin closed 4 months ago

kumin commented 1 year ago

sometime I got this bug but do not know why?

kafka tells us there's no message in the topic, but our cache has one. The table might be gone. Try to delete your local cache! Topic, partition 2, hwm 0, local offset 3

My table topic is not empty

devcorpio commented 1 year ago

Hi @kumin,

I'm not a goka contributor, but I saw this message while trying to understand the problem. Although I don't still understand the reason causing the error I have found two ways of being unblocked if it happens again, the second option is my favourite one. Irrespective of that it seems a bug

  1. changing the name of the topic name and group name has made that error disappear.

2 rm -rf /tmp/goka (the local cache mentioned in the log). (ofc you can check the content there in case you are curious, in my case I had a processor folder and group tables things)

I have found the path of the cache here:

https://github.com/lovoo/goka/blob/d1560005ad543a34669ed1686b8788e460820020/options.go#L44

Hope this helps

devcorpio commented 1 year ago

Hi again,

I have found the steps to reproduce the issue (I understand that different situations can lead to the same error, but just in case helps you)

Context: I'm using a docker. important: I'm not keeping a copy of the Kafka logs in my host, since for now, I'm just in the first stages of what I want to build

Steps:

  1. rm -rf /tmp/goka (so you can start with something working that leads to something that will start failing)
  2. start your zookeeper and broker(s)
  3. start your processor(s)
  4. generate/produce your data
  5. you will see the processor working and your logs if you have
  6. --- now the steps that will reproduce the error ---
  7. stop your processor(s)
  8. make sure the Kafka logs are not there anymore (in my case is destroying the containers and starting them again, in a nutshell starting a "brand-new" Kafka setup)
  9. start the processor(s)
  10. you will see the error!!! the cache has info but Kafka will not since is a brand-new one
Screenshot 2023-01-17 at 17 01 56

Again, that's just my scenario, hope it helps you somehow, though.

frairon commented 1 year ago

Hi all,

@kumin could your topic name be empty? According to the code, the topic name should be included in the error message, but it's empty. So that looks like there's something wrong with the name.

In case this was not the issue, here's some background: a goka processor keeps a local copy of a table in a key-value store on the file system. That's done for its own processor-table (accessible via ctx.Value/ctx.SetValue) and all lookup tables (ctx.Lookup/ctx.Join). To keep the tables in sync with their backing kafka-topic, goka initially consumes the whole topic from the beginning and then continuously consumes all updates. The last offset is stored along with the data, so after restart, the syncing can just continue without reconsuming the whole table. The procedure is roughly like this (each partition works on its own):

Goka fails because it doesn't know from which point to recover safely, the conditions are simply invalid. It has an offset, that does not exist in its source yet and that can't be right.

So the main reason is that the topic is gone or was recreated, but there aren't as many message in it as before. Anyway the only thing you can do is delete local storage. If deleting is impossible or cumbersome for testing, you can use memory-storage. That means all data will be lost when the processor restarts, so maybe don't use it for huge topics. Pass the following when initializing the processor

goka.NewProcessor(...,
                goka.WithStorageBuilder(func(topic string, partition int32) (storage.Storage, error) {
            return storage.NewMemory(), nil
        }),
)

Hope that helps, let me know if you found the solution or if you need more support.