spring-projects / spring-ai

An Application Framework for AI Engineering
https://docs.spring.io/spring-ai/reference/index.html
Apache License 2.0
3.32k stars 850 forks source link

Support asynchronous/reactive function calling #1778

Open PeatBoy opened 2 days ago

PeatBoy commented 2 days ago

Please do a quick search on GitHub issues first, the feature you are about to request might have already been requested.

Expected Behavior

The current FunctionCallback only supports blocking programming. If the bottom layer of the provided function callback method uses the streaming programming of reactor, it can only be blocked by block or asynchronous thread pool.

Current Behavior

I hope to provide a FunctionCallback that can be called by reactor streaming programming. For example, bi function < request, tool context, Mono >

Context This is more friendly for applications built entirely by webflux, and using blocking FunctionCallback in webflux will bring many difficult problems.

More information

tzolov commented 1 hour ago

@PeatBoy - Thanks for flagging this. Reactive FunctionCallback isn't currently supported. While we (@chemicL) may consider adding such a feature in the future, for now you'll need to use a blocking call instead of .subscribe(res::add).

chemicL commented 1 hour ago

Hey, as @tzolov mentioned, reactive/asynchronous signatures for functions are currently not supported, but it looks like a desired functionality having in mind streaming scenarios.

I suggest closing the discussion #1757 and keeping this issue and related discussions here.

As you noticed:

    ArrayList<Object> res = new ArrayList<>();
        // ...
        .subscribe(res::add);
    return new Response(res);

this construct immediately returns an object which is incomplete and another Thread will mutate it (potentially not even making the result visible to the consuming Thread) with no synchronization whatsoever. That is undesired.

As a solution for the time being you don't need to introduce CompletableFuture at all:

List<Map<String, Object>> data = CompletableFuture.supplyAsync(
    () -> this.dataQueryRepository.query()
        // ...
        .collectList()
        .block())
    .join();

Instead,

this.dataQueryRepository.query()
    // ...
    .collectList()
    .block())

should get you the same result.

For the Spring AI's internal implementation, it's worth keeping in mind that in case when an imperative user function is executed in an event loop, it's worth offloading the blocking call into a bounded-elastic Scheduler on which blocking is allowed, e.g. via

        return Mono.fromCallable(() -> userBlockingFunction())
            .subscribeOn(Schedulers.boundedElastic());

regardless of whether the function is using reactive APIs or not, since just performing blocking calls in imperative code will degrade the performance and stall the event loop.