lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.36k stars 173 forks source link

How to fill up local cache with messages from a custom offset on start up? #425

Open onkarbanerjee opened 1 year ago

onkarbanerjee commented 1 year ago

We are using goka to join event streams and it has been running for over two years now. So everytime the pod is restarted, the goka processor starts backfilling with two year old data. Now this is causing an issue with huge memory requirements for the service using the goka processor and also we do not need the two year old data. How can I configure the goka processor to start backfilling from custom offset or timestamp?

frairon commented 1 year ago

Hi @onkarbanerjee,

sorry for late reply! Currently there is no way to configure a goka-processor to stark backfilling from other than the beginning. This is intentional to guarantee integrity of the data. If you wanted to remove old entries in the processor table, then those entries should be deleted by the processor itself using ctx.Delete(). But how to do that is a different story :).

Usually, a processor is not meant to recover (backfill) all data from its table-topic. Instead, the table is persisted on disk and only recovers the changes since the last run. If you have only one instance, there wouldn't be anything to recover after a restart. I'm guessing you're running on kubernetes, so consider the following points:

How to configure a storage directory

When you initialize your processor you will have to configure the storage layer one way or another. Otherwise the data ends up in /tmp/goka, which is not the right place in almost all cases. To get the default storage, with just the folder changed, do

    goka.NewProcessor(
        []string{"localhost:9092"}, // brokers
        goka.DefineGroup(
            "group",
            /* edges ...*/
        ),
        goka.WithStorageBuilder(storage.DefaultBuilder("/path/to/persisted/directory")),
    )

As said, this is required in most cases for initializing processors or views and the documentation is probably lacking that info a bit :)

Let us know if that fixes the issue!