tkem / cachetools

Extensible memoizing collections and decorators
MIT License
2.34k stars 163 forks source link

"Batch Caching" method #276

Closed bgirschig closed 1 year ago

bgirschig commented 1 year ago

I've created this feature for myself, and was wondering if you'd be interested in a pull request.

Example situation

In this case, a regular cache wrapper (eg. @cached) doesn't work because using the item id list as a cache key means only calls with the exact same list of item ids would result in a cache hit.

Proposed solution

I created a wrapper that caches individual items, and executes the batch function with the list of item ids that were not found in cache:

from functools import wraps
from enum import Enum

# Inspired by functools's _make_key
kwd_mark = object()
def make_cache_key(*args, **kwargs):
  return args + (kwd_mark,) + tuple(sorted(kwargs.items()))

class OutputMode(Enum):
    LIST = 'list'
    DICT = 'dict'

def batch_cache(cache, output_mode:OutputMode):
  def batch_cache_outer(fn):
    @wraps(fn)
    def batch_cache_inner(item_keys, **kwargs):
      output_items = {}

      item_keys_not_in_cache = []
      for item_key in item_keys:
        # Cache keys are not the same as item keys, because they include kwargs, so that an item
        # with the same key but with different kwargs can give different results
        cache_key = make_cache_key(item_key, **kwargs)

        if cache_key in cache:
          output_items[item_key] = cache[cache_key]
        else:
          item_keys_not_in_cache.append(item_key)

      response = fn(item_keys_not_in_cache, **kwargs)

      for item_idx, item_key in enumerate(item_keys_not_in_cache):
        item = get_item_from_response(response, item_key, item_idx)

        # add item to the current batch response
        output_items[item_key] = item

        # add item to the cache
        cache_key = make_cache_key(item_key, **kwargs)
        cache[cache_key] = item

      if output_mode == OutputMode.DICT:
        return output_items
      if output_mode == OutputMode.LIST:
        return [output_items[item_key] for item_key in item_keys]
      raise Exception(f"Unexpected output mode: {output_mode}")

    return batch_cache_inner

  # Because we're dealing with batches, the cache wrapper needs to be aware of the cached function's
  # output structure.
  def get_item_from_response(response, key, idx):
    if output_mode == OutputMode.DICT:
      return response.get(key)
    if output_mode == output_mode.LIST:
      return response[idx]
    raise Exception(f"Unexpected output mode: {output_mode}")
  return batch_cache_outer

Example use

from cachetools import TTLCache
from datetime import timedelta

cache = TTLCache(maxsize=4, ttl=timedelta(hours=12).seconds)

@batch_cache(cache=cache, output_mode=OutputMode.LIST)
def get_stuff(item_ids:list[str], lang="en"):
  if (len(item_ids)==0): return []
  print(f"very long and difficult process on the following items: {item_ids} (in {lang})")
  return [f"This is item {item_id} in {lang}" for item_id in item_ids]

print(get_stuff(["A", "B", "C"], lang="fr"))
print(get_stuff(["B", "C", "D"], lang="fr"))
print(get_stuff(["A", "C", "D"], lang="fr"))
print(get_stuff(["A", "C", "D"], lang="en"))

I think it would make sense to add this feature here, but I don't want to spend time adapting it to this repository if you don't think it's a good idea.

tkem commented 1 year ago

@bgirschig: Thanks for your interest, but due to time and resource constraints, no feature requests and/or PRs are currently accepted.