spring-projects / spring-data-r2dbc

Provide support to increase developer productivity in Java when using Reactive Relational Database Connectivity. Uses familiar Spring concepts such as a DatabaseClient for core API usage and lightweight repository style data access.
Apache License 2.0
708 stars 132 forks source link

Unexpected behaviour when using repository saveAll() in WebFilter with response body #857

Closed ptrblgh closed 5 months ago

ptrblgh commented 5 months ago

Hi,

I would like to call a repository saveAll method in a WebFilter class, but when I return with a body populated ServerResponse the saveAll doesn't save anything in the DB. Returning without a body, the data is saved in the DB.

Here are the sources that i use.

Router with handlers:

@Configuration
@Slf4j
public class ResponseBodyRouter {
    @Bean
    public RouterFunction<ServerResponse> responseBodyRoute() {
        return route()
                .GET("/with-response-body", this::withResponseBodyHandler)
                .GET("/without-response-body", this::withoutResponseBodyHandler)
                .build();
    }

    private Mono<ServerResponse> withResponseBodyHandler(ServerRequest req) {
        return ServerResponse
                .ok()
                .bodyValue("Body\n");
    }

    private Mono<ServerResponse> withoutResponseBodyHandler(ServerRequest req) {
        return ServerResponse
                .ok()
                .build();
    }
}

Webfilter:

@Component
@Order(Ordered.LOWEST_PRECEDENCE)
@Slf4j
public class LogWebFilter implements WebFilter {

    @Autowired
    private UserOperationLogRepository userOperationLogRepository;

    @NonNull
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        return chain
                .filter(exchange)
                .then(logMessage(exchange));
    }

    public Mono<Void> logMessage(ServerWebExchange exchange) {
        return Mono.deferContextual(ctx -> {
            var list = new ArrayList<UserOperationLog>();
            var userOperationLog = new UserOperationLog();

            userOperationLog.setAction((short) 1);
            userOperationLog.setPayload("payload");
            userOperationLog.setResource("Client");
            userOperationLog.setRowId(1L);
            userOperationLog.setCreatedAt(OffsetDateTime.now());
            userOperationLog.setUserId(1L);

            list.add(userOperationLog);

            return userOperationLogRepository.saveAll(list)
                    .log()
                    .collectList()
                    .then();
        });
    }
}

The bean:

@Data
@Table("user_operation_log")
public class UserOperationLog implements Persistable {

    @Id
    @Column("user_operation_log_id")
    protected Long id;

    @Column("user_id")
    protected Long userId;

    @Column("action")
    protected Short action;

    @Column("resource")
    protected String resource;

    @Column("row_id")
    protected Long rowId;

    @Column("payload")
    protected String payload;

    @Column("error")
    protected String error;

    @Column("created_at")
    protected OffsetDateTime createdAt;

    @Override
    public boolean isNew() {
        return true;
    }
}

Reactive stream signal logs without response body:

onSubscribe(FluxUsingWhen.UsingWhenSubscriber)
request(unbounded)
onNext(UserOperationLog(id=102, userId=1, action=1, resource=Client, rowId=1, payload=payload, error=null, createdAt=2024-04-05T12:20:07.990301862+02:00))
onComplete()

Reactive stream signal logs with response body:

onSubscribe(FluxUsingWhen.UsingWhenSubscriber)
request(unbounded)
cancel()
mp911de commented 5 months ago

The difference seems to be the cancel signal. I think you can reduce your reproducer to Mono.just(…).log().then() and this isn't a Spring Data issue. You might want to reach out to the framework team asking why the filter Mono subscription sees a cancellation signal.

ptrblgh commented 5 months ago

Thank you for the quick response, but if I change it to a scalar or for a fluxiterable subscription it works as expected. That is why I thought that the repository's save/saveAll method handles something differently when there's a body in the response.

With Mono.just("String"):

onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
request(unbounded)
onNext([UserOperationLog(id=null, userId=1, action=1, resource=Client, rowId=1, payload=payload, error=null, createdAt=2024-04-05T14:19:31.461544039+02:00)])
onComplete()

With Flux.fromIterable(list):

onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
request(unbounded)
onNext(UserOperationLog(id=null, userId=1, action=1, resource=Client, rowId=1, payload=payload, error=null, createdAt=2024-04-05T14:16:19.198144534+02:00))
onComplete()
ptrblgh commented 5 months ago

I have overcome this problem with a different approach using the beforeCommit function in the WebFilter:

@Component
@Order(Ordered.LOWEST_PRECEDENCE)
@Slf4j
public class LogWebFilter implements WebFilter {

    @Autowired
    private UserOperationLogRepository userOperationLogRepository;

    @NonNull
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        exchange.getResponse().beforeCommit(() -> Mono.deferContextual(ctx -> {
            var list = new ArrayList<UserOperationLog>();
            var userOperationLog = new UserOperationLog();

            userOperationLog.setAction((short) 1);
            userOperationLog.setPayload("payload");
            userOperationLog.setResource("Client");
            userOperationLog.setRowId(1L);
            userOperationLog.setCreatedAt(OffsetDateTime.now());
            userOperationLog.setUserId(1L);

            list.add(userOperationLog);

            return userOperationLogRepository.saveAll(list).collectList().then();
        }));

        return chain
                .filter(exchange);
    }
}