spring-projects / spring-data-r2dbc

Provide support to increase developer productivity in Java when using Reactive Relational Database Connectivity. Uses familiar Spring concepts such as a DatabaseClient for core API usage and lightweight repository style data access.
Apache License 2.0
708 stars 132 forks source link

r2dbcrepository always run in single thread #818

Closed vavisha closed 1 year ago

vavisha commented 1 year ago

Versions

Current Behavior

After the reactive repository (R2dbcRepository), operators are executed in one thread (reactor-tcp-nio-1) out of 10 available

Table schema

CREATE TABLE value (
  id bigint NOT NULL GENERATED ALWAYS AS IDENTITY,
  value varchar null
)

simple example app with docker-compose.yml in zip file : r2dbc.zip log: r2dbc.log

Code ```java package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class R2DBCApplication { private static final Logger log = LoggerFactory.getLogger(R2DBCApplication.class); public static void main(final String[] args) { SpringApplication.run(R2DBCApplication.class, args); } } ``` ```java package com.example; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; import java.util.UUID; @RestController public class ValueController { private static final Logger log = LoggerFactory.getLogger(ValueController.class); @Autowired private ValueRepository repository; @GetMapping("/value") public Mono get() { final var uuid = UUID.randomUUID().toString(); log.info("start for uuid = {}", uuid); return repository .findById(1L) .map(ValueEntity::getId) .map(String::valueOf) .switchIfEmpty(Mono.fromSupplier(() -> uuid)) .doOnNext(v -> log.info("after switchIfEmpty uuid = {}", v)) .doOnNext(v -> log.info("breakpoint uuid = {}", v)); } } ``` ```java package com.example; import org.springframework.data.annotation.Id; import org.springframework.data.relational.core.mapping.Column; import org.springframework.data.relational.core.mapping.Table; @Table("value") public class ValueEntity { @Id @Column("id") private Long id; @Column("value") private String value; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getValue() { return value; } public void setValue(final String value) { this.value = value; } } ``` ```java package com.example; import org.springframework.data.r2dbc.repository.R2dbcRepository; public interface ValueRepository extends R2dbcRepository { } ``` ```yml server.port: 9080 spring: application.name: r2dbc-service r2dbc: url: r2dbc:postgresql://localhost:5433/postgres username: postgres password: 12345 properties: schema: public logging.level.root: DEBUG ``` ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 3.0.2 com.example demo 0.0.1-SNAPSHOT demo Demo project for Spring Boot 17 org.springframework.boot spring-boot-starter-webflux org.springframework.boot spring-boot-starter-data-r2dbc org.postgresql r2dbc-postgresql runtime org.springframework.boot spring-boot-maven-plugin ```

Steps to reproduce

  1. set breakpoint in ValueController on lambda (not line) in .doOnNext(v -> log.info("breakpoint uuid = {}", v))
  2. open url http://localhost:9080/value in browser
  3. open url http://localhost:9080/value in browser

result: breakpoint won't hit a second time (reactor-tcp-nio-1 stopped, second connection wait while reactor-tcp-nio-1 release)

Expected behavior

After the reactive repository (R2dbcRepository), operators are executed in separated r2dbc's threads/workers (thread per connection, e.g. reactor-tcp-nio-1 and reactor-tcp-nio-2)

mp911de commented 1 year ago

This isn't a Spring Data issue, it relates to the netty event loop. Each connection is associated with a unique event loop Thread and if you use the same R2DBC connection (you can check the PID), then your code hits the same thread which is the most efficient way of working.

Unless you concurrently use multiple connections, you won't see that other IO threads will be utilized.