redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.09k stars 816 forks source link

Memory cache never compacts expired keys with a stable set of cache keys #1993

Open dudleycarr opened 1 year ago

dudleycarr commented 1 year ago

We're using Benthos to periodically monitor and process a set of IDs. To prevent needless reprocessing, we're caching IDs for some period of time. If the ID wasn't successfully processed downstream after the cache key TTL, we expect the ID to have been purged from the cache and be reprocessed.

Minimal configuration:

input:
  generate:
    interval: 1s
    mapping: root.key = "a"
pipeline:
  threads: -1
  processors:
    - cache:
        resource: mycache
        operator: add
        key: '${! json("key") }'
        value: "foo"
        ttl: 10s
    - mapping: root = if errored() { deleted() }
output:
  label: ""
  stdout:
    codec: lines
cache_resources:
  - label: mycache
    memory:
      compaction_interval: 1s

Expected output after 10s

{"key":"a"}
{"key":"a"}

Actual output after 10s

{"key":"a"}

The current work around is to always set a key on each pass through the pipeline to force a compaction for expired cache entries.

Jeffail commented 1 year ago

Hey @dudleycarr, yeah that's annoying, ideally we should have a background process that does occasional compactions.

We currently document "The compaction interval determines how often the cache is cleared of expired items, and this process is only triggered on writes to the cache", so changing it is technically a breaking change but we could maybe add a field enable_background_compactions: true to change that.

kmpm commented 4 months ago

How about, in the Add func check if isExpired? Basically in internal/impl/pure/cache_memory.go changing

    if _, exists := shard.items[key]; exists {
        shard.Unlock()
        return service.ErrKeyAlreadyExists
    }

to

    if k, exists := shard.items[key]; exists && !shard.isExpired(k) {
        shard.Unlock()
        return service.ErrKeyAlreadyExists
    }

An Add should perhaps be considered to be a sort of write so the the documentation is still sort of correct. Writing a key that should have been cleared is also a kind of write :)

I'll gladly create a PR.

AndreasBergmeier6176 commented 2 days ago

Hey @dudleycarr, yeah that's annoying, ideally we should have a background process that does occasional compactions.

We currently document "The compaction interval determines how often the cache is cleared of expired items, and this process is only triggered on writes to the cache", so changing it is technically a breaking change but we could maybe add a field enable_background_compactions: true to change that.

What about a field compaction_strategy with values onmutate (default) or background?