spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.61k stars 38.13k forks source link

Reactive types support for `@Cacheable` methods [SPR-14235] #17920

Closed spring-projects-issues closed 1 year ago

spring-projects-issues commented 8 years ago

Pablo Díaz-López opened SPR-14235 and commented

Currently when using cache annotations on beans it caches the Observables like other types, So it will not cache its value.

I tried to use the following pattern to handle it:

@Cacheable("items")
public Observable<Item> getItem(Long id) {
    return Observable.just(id)
        .map(myrepo::getById)
        .cache();
}

In the happy path, as we cache the Observable values works pretty well, but if getById throws an exception the observable is cached with the exception which isn't how it should work.

It would be also very nice to have support to Single.

If you give me some advice I can try to do a PR to solve this.


Affects: 4.2.5

Sub-tasks:

Referenced from: pull request https://github.com/spring-projects/spring-framework/pull/1066

1 votes, 8 watchers

cbornet commented 5 years ago

Hi, what is the status of this ? If I understand well, there was discussion about a ReactiveCache since JSR-107 is a blocking API. Has something evolved since the opening of this issue ?

snicoll commented 5 years ago

Nothing has been evolving in that area I am afraid and given the lack of proprietary implementations, I think cache vendors themselves aren't keen to explore that route either. This leaves us with the blocking model which isn't a great fit with whatever @Cacheable has to offer.

There is a cache operator in reactor though and I remember so initial work trying to harmonize this here. Perhaps @simonbasle can refresh my memory?

simonbasle commented 5 years ago

I made an attempt at an opinionated API towards caching in reactor-extra (see reactor-addons repo), but that has not seen much focus nor feedback since then, so I wouldn't claim it is perfect.

tomasulo commented 4 years ago

Are there any updates on this?

snicoll commented 4 years ago

No, the latest update on is here. Most cache libraries (including JCache) are still on a blocking model so my comment above still stands.

k631583871 commented 3 years ago

Are there any updates on this?

snicoll commented 3 years ago

@k631583871 I already replied to that question just above your comment.

ankurpathak commented 3 years ago

Redis has reactive driver. So it can be implemented for redis

Bryksin commented 3 years ago

I made new project for reactive cache with proper annotation usage, Spring Boot auto-configuration and tests, looks like working well, it will be soon deployed to our production.

bezrukavyy commented 2 years ago

Folks, is there a technical reason @Cacheable is not implemented with Reactor? I've implemented cache with Caffeine AsyncCache, as a AOP MethodInterceptor, but one of my colleagues made a comment that we have to be careful with the interceptor approach since this is how the normal @Cacheable is implemented, so if it were so trivial why didn't Spring add this support to be used with Reactor. Is there a performance penalty somehow?

We can do something similar with a custom operator for the flow, but I really like the AOP interceptor approach - very clean and gives me access to the method's attributes in bulk.

howardem commented 2 years ago

Developers, myself included, have been waiting for Spring to support the reactive types in the Cache Annotation implementation, also known as Cache Abstraction. What is interesting in this story, Micronaut, a relative new framework compared to Spring, added support for reactives types in their Cache Annotation implementation since their 1.0.0 GA Release back in October 2018. You can check it by yourself:

Not related to this topic, but another area where Micronaut has been more successful than Spring is supporting reactive types through their declarative HTTP Client. Spring Cloud Open Feign doesn't support reactive types and in the documentation they explicitly say they won't until OpenFeign supports Project Reactor. Interesting, Micronaut's declarative HTTP Client supports any type that implements the org.reactivestreams.Publisher interface.

BTW, I'm a huge Spring Framework fan, I've been coding Spring based applications since 2005 and for the last 5 years doing Spring Cloud microservices, but my dear Spring folks, in my honest opinion it is time to catch up!

simonbasle commented 2 years ago

@howardem yeah, Micronaut does have an AsyncCache API, which might in some case use the backing cache provider's own async methods, or put some async lipstick on a blocking-only API (by running things in dedicated threads)....

One thing to consider is that Micronaut's Cacheable interceptor needs to work with the common denominator of cache providers it claim to support. As a result, caching the result of an reactive-returning method boils down to these steps:

  1. get from cache as Mono<Optional<T>>
  2. flatMap on the optional
  3. if valued, just unwrap the Optional to a T. DONE
  4. if empty, convert the original method return Publisher to a Mono
  5. flatMap the result of that Publisher and call asyncCache.put with it
  6. (if said publisher instead completes empty, continue the sequence with asyncCache.invalidate instead)

One glaring limitation is that it is not "atomic". It is comparable to calling a ConcurentMap (containsKey(k)) ? get(k) : put(k, v) instead of the atomic putIfAbsent(k, v).

The thing is, caching atomically is more important in the case of an asynchronous method:

Micronaut's AsyncCache interface might have tried to introduce some remediation in the form of get(K, Supplier<V>), but that Supplier is still a blocking thing. Again, this seems to be for lowest common denominator reasons, as only Caffeine has a truly async API including a get(K, Function<K, CompletableFuture<V>>) (it is actually a BiFunction, but you get the idea).

Note that Micronaut's own Cacheable interceptor doesn't even use that get(K, Supplier<V>) method from AsyncCache... For caching reactive types, I don't think it actually even try to support the atomic parameter to Cacheable at all.

So yeah, it has an async cache abstraction. It is best effort, and might hide subtle issues like cache stampeding, so I would take it with a grain of salt.

ben-manes commented 2 years ago

fwiw, I believe Quarkus implements its caches by requiring that they are asynchronous. Their interceptor checks if the return type is reactive, and if not simply block indefinitely (unless a timeout is specified). Internally they use a Caffeine AsyncCache, saving a thread by performing the work on the caller by manually inserting and completing the future value. I'm not sure if they support any provider other than Caffeine, so that makes it much easier as they could require asynchronous implementations and emulate a synchronous api by blocking. Maybe Spring Cache could iterate towards something similar (assuming some caveats for other providers can be made acceptable)?

simonbasle commented 2 years ago

yeah, that's interesting. they initially (very early) thought about supporting multiple caching providers but the reduction in scope (annotations only at first) and the focus on the 80% case led them to only use Caffeine as the underlying implementation 👍 they don't seem to use your get(K key, BiFunction<K, ExecutorService, CompletableFuture> loader) variant @ben-manes, but instead rely on putIfAbsent and computeIfPresent.

but yeah, they focused on an async API returning Uni reactive types (which can very easily be presented as CompletableFutures to Caffeine).

ben-manes commented 2 years ago

I think that I suggested the putIfAbsent approach so that they could reuse the caller thread. There were many PRs as they iterated and I advised from a point of ignorance (as I use Guice at work, no annotated caching, and it’s all a weekend hobby). I think going async was an evolution of their internal apis, but my knowledge is very fuzzy. I hope that other caching providers become async friendly to enable similar integrations, but I don’t know the state of that support.

62mkv commented 2 years ago

hy @Bryksin how has it been working for you? we also currently use Redis-backed cache (via redisson), but we have to implement it in the code level, along the lines of

getFromCache(key).
.switchIfEmpty(getFromUpstream().delayUntil(value -> putToCache(key, value))

which I don't like too much, as caching should be treated a cross-cutting concern, easily disable-able if needed

I like the API of your library (https://github.com/Bryksin/redis-reactive-cache) but my coworkers are hesitant given apparent lack of community adoption :(

also would be interesting to learn of @simonbasle take on this approach 🙏

Bryksin commented 2 years ago

Hi @62mkv, unfortunately usage of the lib is very low, no time to focus on it properly It was just MVP version and definitely has room for improvement, at least:

Though time is going but such important aspect as Cache continue to be ignored by Spring for reactive stuff by some reason...

snicoll commented 2 years ago

@Bryksin with regards to "ignoring for some reason", this comment above yours should help nuance that hopefully.

mraible commented 2 years ago

This seems like it might be why the following doesn't work for me in a WebFlux app:

@Cacheable(cacheNames="users", key="#token")
public Mono<Jwt> enrich(String token, Jwt jwt) { } 

It still seems like this method is hit every time, even though the token value is the same.

Code and comment from https://github.com/jhipster/generator-jhipster/pull/18241#issuecomment-1127083403.

Is the recommendation that we don't use Cacheable in WebFlux apps for things like this?

snicoll commented 2 years ago

I am not sure what you're asking. Yes, it is expected that this doesn't work on reactive types.

ManikantaGembali commented 2 years ago

Is there any plan to fix the above issue?

Note: Caffeine AsyncCache can help. But we need to write code manually instead of @cacheable.

bclozel commented 2 years ago

As far as I understand, here's the status of this issue: @Cacheable is meant as an abstraction model for many caching libraries and APIs, letting you use a common programming model on top of many implementations. In the case of reactive, few vendors support the async model right now so any short term solution we could adopt might not reflect the approach and API semantics chosen by the majority in the future.

In short, if you'd like this issue to make progress, you should reach out to your favorite cache library vendor, ask for async/reactive support, and work with them on the use case.

jaesuk-kim0808 commented 2 years ago

I agree with bclozel's comment. I'm currently in a situation where I need a caching feature within a reactive application implemented via webflux. So I tried to solve this problem with future-based asynchronous APIs Caffeine and Mono.toFuture/Mono.fromFuture(reactor/reactor-addons#237), and implemented it using spring-aop and custom cache manager&annotation like this :

/**
I created AsyncCacheManager and @AsyncCacheable to use instead of CacheManager and @Cacheable functions of spring-cache.
Create AsyncCache for the method declared @AsyncCacheable in BeanPostProcessor, and then use it by getting it from AsyncCacheManager.
AsyncCacheManager is implemented as ConcurrentHashMap<String, AsyncCache>.
When calling asyncCache.get(), put Mono.defer(method).toFuture() in the mapping function. And return via Mono.fromFuture.
*/

@Around("pointcut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
    ...
    AsyncCache asyncCache = asyncCacheManager.get(cacheName);
    if (Objects.isNull(asyncCache)) {
        return joinPoint.proceed();
    }

    //Return type : Mono
    Mono retVal = Mono.defer(() -> {
        try {
            return (Mono) joinPoint.proceed();
        } catch (Throwable th) {
            //error handling
        }
    });

    CompletableFuture completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
    return Objects.nonNull(completableFuture) ? Mono.fromFuture(completableFuture) : retVal;
}

Before visiting this issue, I got help from ben-manes of the Caffeine Project. (https://github.com/ben-manes/caffeine/discussions/500) Finally, I checked the blocked section using Reactor/BlockHound (https://github.com/reactor/BlockHound) and made sure it worked as intended.

Are there any expected problems with how to use it this way? The reason I am writing this is to share it with people who are experiencing the same problem. If you have a better way, please share.

smilejh commented 2 years ago

HI, @jaesuk-kim0808 Your solution is very interesting. I have this same problem. I want your solution code. please share with me.

jaesuk-kim0808 commented 2 years ago

Hi. I share the code I use. Although it has not yet been used in a real product, performance and functionality will be verified through load tests according to various scenarios in the near future. Hope this helps. Please let me know if there is a problem.

@RequiredArgsConstructor
@Aspect
@Component
public class AsyncCacheAspect {

    private final AsyncCacheManager asyncCacheManager;

    @Pointcut("@annotation(AsyncCacheable)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        ParameterizedType parameterizedType = (ParameterizedType) method.getGenericReturnType();
        Type rawType = parameterizedType.getRawType();

        if (!rawType.equals(Mono.class) && !rawType.equals(Flux.class)) {
            throw new IllegalArgumentException("The return type is not Mono/Flux. Use Mono/Flux for return type. method: " + method.getName());
        }

        AsyncCacheable asyncCacheable = method.getAnnotation(AsyncCacheable.class);
        String cacheName = asyncCacheable.name();
        Object[] args = joinPoint.getArgs();

        AsyncCache asyncCache = asyncCacheManager.get(cacheName);
        if (Objects.isNull(asyncCache)) {
            return joinPoint.proceed();
        }

        //Return type : Mono
        if (rawType.equals(Mono.class)) {
            Mono retVal = Mono.defer(() -> {
                try {
                    return (Mono) joinPoint.proceed();
                } catch (Throwable th) {
                    throw new BusinessException(ResultCode.UNKNOWN_ERROR, th.getMessage());
                }
            });

            CompletableFuture completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
            return  Mono.fromFuture(completableFuture);
        }

        //Return type : Flux
        Mono retVal = Mono.from(Flux.defer(() -> {
            try {
                return  ((Flux) joinPoint.proceed()).collectList();
            } catch (Throwable th) {
                throw new BusinessException(ResultCode.UNKNOWN_ERROR, th.getMessage());
            }
        }));

        CompletableFuture<List> completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
        return Flux.from(Mono.fromFuture(completableFuture)).flatMap(x -> Flux.fromIterable(x));
    }

    private String generateKey(Object... objects) {
        return Arrays.stream(objects)
            .map(obj -> obj == null ? "" : obj.toString())
            .collect(Collectors.joining("#"));
    }
}
@RequiredArgsConstructor
@Component
public class AsyncCacheableMethodProcessor implements BeanPostProcessor {

    private final AsyncCacheManager asyncCacheManager;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

        Arrays.stream(bean.getClass().getDeclaredMethods())
            .filter(m -> m.isAnnotationPresent(AsyncCacheable.class))
            .forEach(m -> {
                AsyncCacheable asyncCacheable = m.getAnnotation(AsyncCacheable.class);
                String cacheName = asyncCacheable.name();
                CacheType cacheType = CacheType.nameOf(cacheName);
                if (Objects.nonNull(cacheType)) {
                    asyncCacheManager.computeIfAbsent(cacheName, (key) ->  {
                        return Caffeine.newBuilder()
                            .maximumSize(cacheType.getMaximumSize())
                            .expireAfterWrite(cacheType.getExpiredAfterWrite(), TimeUnit.SECONDS)
                            .buildAsync();
                    });
                }
            });

        return bean;
    }

}
//This code is in Data access layer.
@AsyncCacheable(name = "getBySvcId")
@Override
public Mono<Domain> getBySvcId(String svcId) {}

AsyncCacheManager is a spring-bean with ConcurrentHashMap<String, AsyncCache>. CaheType is an enum that defines the key to be stored in AsyncCacheManager and 'maximumSize, expireAfterWrite' required to create Caffeine's AsyncCache.

smilejh commented 2 years ago

Thank you. This is very helped me.

kidhack83 commented 2 years ago

Hi, @jaesuk-kim0808 I'm integrating your code but the AOP is not invoked...

//This code is in Data access layer.
@AsyncCacheable(name = "getBySvcId")
@Override
public Mono<Domain> getBySvcId(String svcId) {}

Can I put the method to cached in a service implementation as a private method? For example:

@AsyncCacheable(name = "getCurrencies")
private Mono<List<RateDto>> getCurrencies(String source, String target) {
  return currencyRatesApi.getCurrencyRates()
      .collectList();
}

The parameters aren't used in the code because I need to cache all elements. I need to change this code, but the pointcut is not launched I try to change it to public but the AOP is not invoked...

Thanks for your great work

vgaborabs commented 1 year ago

Hi @kidhack83 As far as i know Spring AOP advices rely on proxies, which means they do not fire when the methods are being called from within the same instance. Try to adjust your code accordingly:

@Component
public class ServiceA {
  ...
  @AsyncCacheable(name = "getCurrencies")
  public Mono<List<RateDto>> getCurrencies(String source, String target) {
    return currencyRatesApi.getCurrencyRates()
        .collectList();
  }
  ...
}

@Component
@RequiredArgsConstructor
public class ServiceB {
  private final ServiceA serviceA;
  ...
  private Mono<List<RateDto>> someMethod(String source, String target) {
     return serviceA.getCurrencies(source, target);
  }
  ...
}

I hope this helps.

shaikezr commented 1 year ago

I took the solutions in this thread and made a more fleshed out version that includes the annotation, the imports, and the dependencies: https://github.com/shaikezr/async-cacheable

jhoeller commented 1 year ago

While we still do not see @Cacheable and co as a perfect companion for reactive service methods, we come back to this now from a pragmatic perspective: It is quite straightforward to support @Cacheable for CompletableFuture-returning methods, and with reasonable adaptation effort, the same approach can work for single-value reactive types such as Mono. With Flux, annotation-driven caching is never going to be an ideal fit, but when asked for it through a corresponding declaration, the best we can do is to collect the Flux outcome into a List and cache that, and then restore a Flux from there in case of a cache hit.

On the provider side, we got Caffeine's CompletableFuture-based AsyncCache which is a fine fit with the approach above. This allows for individual cache annotation processing as well as an @Cacheable(sync=true) style synchronized approach, with very similar semantics as with cache annotation usage on imperative methods. All it takes SPI-wise for that to be possible is two new methods on Spring's Cache interface: CompletableFuture retrieve(key) for plain retrieval and CompletableFuture retrieve(key, Supplier<CompletableFuture>) for sync-style retrieval. Those are designed for an efficient binding in CaffeineCache and can also be implemented in ConcurrentMapCache with some CompletableFuture.supplyAsync usage.

There are a few caveats: In order to address the risk of cache stampede for expensive operations, @Cacheable(sync=true) needs to be declared (just like with our imperative caching arrangement), with only one such caching annotation allowed on a given method then. Also, we expect Cache.evict, Cache.clear and also Cache.put to be effectively non-blocking for individual usage underneath @CacheEvict and @CachePut. This is the case with our common cache providers unless configured for immediate write-through which none of our retrieve-supporting providers have as a feature anyway. So in summary, for providers supporting our retrieve operations, evict/clear/put need to be effectively non-blocking in their implementration. This matches the existing semantic definition of those methods where they allow for asynchronous or deferred backend execution already, e.g. in transactional scenarios, with callers not expecting to see immediate effect when querying the cache right afterwards. The javadoc of those methods hints at the non-blocking requirement for reactive interactions as well now.

Thanks everyone for your comments in this thread! This served as very valuable inspiration.

ben-manes commented 1 year ago

With Flux, annotation-driven caching is never going to be an ideal fit, but when asked for it through a corresponding declaration, the best we can do is to collect the Flux outcome into a List and cache that, and then restore a Flux from there in case of a cache hit.

One somewhat related area where the Flux reactive type is very useful is to support coalescing bulk loads. This allows for collecting multiple independent loads using Reactor's bufferTimeout(maxSize, maxTime) to perform a batch request. I don't believe the annotations support bulk operations yet, but it is a very nice mash-up.

Here is a short (~30 LOC) example demonstrating this using Caffeine directly.

CoalescingBulkLoader ```java public final class CoalescingBulkLoader implements AsyncCacheLoader { private final Function, Map> mappingFunction; private final Sinks.Many> sink; public CoalescingBulkLoader(int maxSize, Duration maxTime, int parallelism, Function, Map> mappingFunction) { this.mappingFunction = requireNonNull(mappingFunction); sink = Sinks.many().unicast().onBackpressureBuffer(); sink.asFlux() .bufferTimeout(maxSize, maxTime) .parallel(parallelism) .runOn(Schedulers.boundedElastic()) .subscribe(this::handle); } @Override public CompletableFuture asyncLoad(K key, Executor executor) { var result = new CompletableFuture(); sink.tryEmitNext(new Request<>(key, result)).orThrow(); return result; } private void handle(List> requests) { try { var results = mappingFunction.apply(requests.stream().map(Request::key).collect(toSet())); requests.forEach(request -> request.result.complete(results.get(request.key()))); } catch (Throwable t) { requests.forEach(request -> request.result.completeExceptionally(t)); } } private record Request(K key, CompletableFuture result) {} } ```
Sample test ```java @Test public void coalesce() { AsyncLoadingCache cache = Caffeine.newBuilder() .buildAsync(new CoalescingBulkLoader<>( /* maxSize */ 5, /* maxTime */ Duration.ofMillis(50), /* parallelism */ 5, keys -> keys.stream().collect(toMap(key -> key, key -> -key)))); var results = new HashMap>(); for (int i = 0; i < 82; i++) { results.put(i, cache.get(i)); } for (var entry : results.entrySet()) { assertThat(entry.getValue().join()).isEqualTo(-entry.getKey()); } } ```
dalegaspi commented 1 year ago

just to throw things in the mix...there are a couple of additional challenges that we sorta need and would still keep the annotation approach:

  1. refreshAfterWrite support. probably doesn't need too much explanation: most of the time we are ok with returning (old/stale) cached data while key is being refreshed asynchronously. we have an implementation that's in the process of being tested as of this writing.
  2. Conditional caching (aka unless or condition attribute). this is a bit trickier and we are running into thread-safety issues.

we are going to share code snippets when we're a bit more confident that it actually works 🙃

oh and one more thing, i see the implementations here casually using .toFuture() to convert a reactive stream to CompletableFuture...which is fine for the most part, but just be mindful when you have operations that can potentially stall the reactive threads because you converted it into a CompletableFuture (we learned this the hard way). this can be mitigated by switching threadpools using publishOn or subscribeOn where applicable.

jhoeller commented 1 year ago

The initial drop is in main now, feel free to check it out... The first milestone to include this will be 6.1 M4 in mid August.

mschaeferjelli commented 7 months ago

I may be running into thread issues on unless. With 6.1.5, I'm getting org.springframework.expression.spel.SpelEvaluationException: EL1011E: Method call: Attempted to call method concat(java.lang.String) on null context object at org.springframework.expression.spel.ast.MethodReference.throwIfNotNullSafe(MethodReference.java:166) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:117) at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:107) at org.springframework.expression.spel.ast.CompoundExpression.getValueRef(CompoundExpression.java:67) at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:97) at org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:114) at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:273) at org.springframework.cache.interceptor.CacheOperationExpressionEvaluator.key(CacheOperationExpressionEvaluator.java:106) at org.springframework.cache.interceptor.CacheAspectSupport$CacheOperationContext.generateKey(CacheAspectSupport.java:913) at org.springframework.cache.interceptor.CacheAspectSupport.generateKey(CacheAspectSupport.java:703) at org.springframework.cache.interceptor.CacheAspectSupport.findCachedValue(CacheAspectSupport.java:480) at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:431) at org.springframework.cache.interceptor.CacheAspectSupport.execute(CacheAspectSupport.java:395) at org.springframework.cache.interceptor.CacheInterceptor.invoke(CacheInterceptor.java:74) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717)

This is the function header and annotations: @Cacheable(value = "broadcaster_id_cache", key = "#siteExternalId.concat('-').concat(#adNetworkIdentity)", unless = "#result == null") public Optional<Long> getBroadcasterId(final String siteExternalId, final String adNetworkIdentity) { I confirm that the values are not null in the function. I've downgraded to 6.0 and it's working correctly.

sbrannen commented 7 months ago

@mschaeferjelli, you have to compile your code with the -parameters flag.

This is documented in the 6.1 upgrade notes: https://github.com/spring-projects/spring-framework/wiki/Upgrading-to-Spring-Framework-6.x#parameter-name-retention

mschaeferjelli commented 7 months ago

That worked, thanks!. As a note, I had to upgrade the maven-compiler-plugin from 3.8.1 (to 3.13.1), because it was not honoring the xml `

true

`