mirromutth / r2dbc-mysql

R2DBC MySQL Implementation
Apache License 2.0
656 stars 100 forks source link

Driver drops onNext after publishing results on another thread (e.g. boundedElastic) #176

Open AlexeiKhilchuk opened 3 years ago

AlexeiKhilchuk commented 3 years ago

Hello. I'm facing the issue when DatabaseClient do not respond on requests on load that causes stream to hang. It reproduces with the test that is provided below. Higher chance to reproduce occurs when I'm trying to publish results of database client on boundedElastic().This stream never completes. Am I doing something wrong here or it is a bug? :)

Notes:

I'm ready to provide additional info for you if needed ASAP. Thanks in advance!

gradle dependencies:

implementation("org.springframework.boot", "spring-boot-starter-data-r2dbc")
implementation("dev.miku", "r2dbc-mysql", "0.8.2.RELEASE")
implementation("io.r2dbc:r2dbc-pool:0.8.6.RELEASE")

MySQL version: MySQL Server 8.0.23 (Docker image)

R2DBC Config:

spring:
  r2dbc:
    url: r2dbcs:mysql://localhost:3308:test_db?serverTimezone=Europe/Moscow&useUnicode=yes&characterEncoding=UTF-8
    username: test_user
    password: password
    pool:
      enabled: true
      max-size: 128
      initial-size: 128
      max-idle-time: PT1M
      validation-query: "SELECT 1"

Test code:

@ExtendWith(SpringExtension::class)
@SpringBootTest
class DebugTest @Autowired constructor(
    private val databaseClient: DatabaseClient,
)  {

    @Test
    fun `db test`() {
        val publishScheduler = Schedulers.newParallel("my-parallel", 512)

        //Clear table
        databaseClient
            .delete()
            .from(ColorEntity::class.java)
            .then()
            .block()

        //Insert entities into the table
        Flux.range(0, 50000)
            .publishOn(publishScheduler)
            .map { ColorEntity(UUID.fromString("1-1-1-1-$it"), "q", "s", "s", "s") }
            .flatMap { entity ->
                databaseClient
                    .insert()
                    .into(ColorEntity::class.java)
                    .using(entity)
                    .then()
                    .thenReturn(entity)
                    .publishOn(Schedulers.boundedElastic()) 
            }
            .collectList()
            .block()
    }
}

@Table("color")
data class ColorEntity(
    @Id
    val id: UUID,
    val name: String,
    val hexCode: String,
    val manufactureCode: String,
    val imageUrl: String
)
mp911de commented 3 years ago

Looking at the test it's not clear whether you're using a connection pool or not. Fan out to 512 concurrent threads can easily overload the target server or hit other system limitations. Care to attach a debug log as file?

AlexeiKhilchuk commented 3 years ago

Hey, thanks for your response. Attaching log file under debug level. As I see in DefaultDatabaseClient bean implementation it uses r2dbc pool from connection factory that is enabled in spring r2dbc config. log.txt