spring-projects / spring-data-r2dbc

Provide support to increase developer productivity in Java when using Reactive Relational Database Connectivity. Uses familiar Spring concepts such as a DatabaseClient for core API usage and lightweight repository style data access.
Apache License 2.0
708 stars 132 forks source link

NativeParameterUtils doesn't support collection with nullable values #845

Closed chinshaw closed 10 months ago

chinshaw commented 10 months ago

We have a case where we are upserting large numbers of daily values into multiple tables. I am going to use a simple example of our timeseries history where we use the collection of tuples to upsert values into the history table.

    @Query(
        """
        INSERT INTO time_series_history (time_series_tag_xid, timestamp, value)
            VALUES :tuples
        ON CONFLICT(time_series_tag_xid, timestamp) 
        DO UPDATE SET 
            value = EXCLUDED.value
        """
    )
    suspend fun upsertReturningCount(tuples: List<Array<Any>>)

I have benchmarked this using large collections and it is around 75% faster than our original batch insert multiple statements, however this only works with non null values.

If you need nulls like suspend fun upsertReturningCount(tuples: List<Array<Any?>>) the NamedParameterUtils class fails to check for null before calling bind. I created a hacky work around but would like advice.

My solution was to override the NamedParamterUtils in my project and add the following bind function.

        private void bind(org.springframework.r2dbc.core.binding.BindTarget target, Iterator<BindMarker> markers,
                          Object valueToBind) {

            Assert.isTrue(markers.hasNext(),
                    () -> String.format(
                            "No bind marker for value [%s] in SQL [%s]. Check that the query was expanded using the same arguments.",
                            valueToBind, toQuery()));

            final BindMarker marker = markers.next();

            // Check to see if the valueToBind is of type NullableParameter
            if (valueToBind instanceof NullableParameter<?> nullableParameter) {
                if (nullableParameter.getValue().isPresent()) {
                    marker.bind(target, nullableParameter.getValue().get());
                } else {
                    marker.bindNull(target, nullableParameter.getCls());
                }
            } else {
                marker.bind(target, valueToBind);
            }
        }

The issue I ran into was that I could not find a way to get the parameter's type so I created a hack class called NullableParameter.

public class NullableParameter<T> {

    private final Class<T> cls;
    private final Optional<T> value;

    public NullableParameter(Class<T> cls, Optional<T> value) {
        this.cls = cls;
        this.value = value;
    }

    public Class<T> getCls() {
        return cls;
    }

    public Optional<T> getValue() {
        return value;
    }

    public static <T> NullableParameter<T> of(Class<T> cls, T value) {
        return new NullableParameter<T>(cls, Optional.of(value));
    }
}

My question: is there any way to check a nullable parameter type? I am assuming that this is not possible but if there was a formal way to pass a nullable parameter to the statement of tuples that would be pretty nice.

Been away from Java for a few years so please don't throw shade on my use of Optionals :)

mp911de commented 10 months ago

Have you seen io.r2dbc.spi.Parameter and its factory Parameters that allows you to create bind parameter instances carrying the type and value?

chinshaw commented 10 months ago

Thanks @mp911de Yes that is what we previously have been using however performance is significantly slower by a factor of 3x. Here is an example of the first edition of our batching code.

This is the abstract class that all other batching repositories inherit from:

    override fun batchUpsertList(inputFlow: List<T>): Flow<T> {
        return getCurrentAuditor()
            .flatMapMany { auditor ->
                val auditTimestamp = LocalDateTime.now()
                // Create a many connection to execute multiple inserts
                databaseClient.inConnectionMany { conn ->
                    val statement = conn.createStatement(upsertSql())
                    inputFlow.mapIndexed { index, wellInput ->
                        bindStatement(statement, wellInput, auditor, auditTimestamp)
                        if (index < inputFlow.size - 1) {
                            statement.add()
                        }
                    }
                    statement
                        .execute()
                        .toFlux()
                        .flatMap { result ->
                            result.map(rowMapper)
                        }
                        .doFinally {
                            conn.close()
                        }
                }
            }.asFlow()
    }

And here is the code where we are binding parameters. This class is extended via our CoroutineCrudRepository.

interface CasingBatchingRepository : BatchingRepository<CasingEntity>

class CasingBatchingRepositoryImpl(
    auditorAware: ReactiveAuditorAware<String>,
    r2dbcEntityTemplate: R2dbcEntityTemplate
) : AbstractBatchingRepository<CasingEntity>(CasingEntity::class.java, r2dbcEntityTemplate, auditorAware),
    CasingBatchingRepository {

    override fun upsertSql(): String = """
                INSERT INTO casing (wellbore_xid, source_id, top_md, bottom_md, id, od, roughness, run_date, version, created_by, updated_by, created_at, updated_at) 
                    VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
                ON CONFLICT(wellbore_xid, source_id) DO UPDATE SET 
                    wellbore_xid = EXCLUDED.wellbore_xid,
                    source_id = EXCLUDED.source_id,
                    top_md = EXCLUDED.top_md,
                    bottom_md = EXCLUDED.bottom_md,
                    id = EXCLUDED.id,
                    od = EXCLUDED.od,
                    roughness = EXCLUDED.roughness,
                    run_date = EXCLUDED.run_date,
                    version = EXCLUDED.version,
                    updated_by = EXCLUDED.updated_by,
                    updated_at = EXCLUDED.updated_at
                RETURNING *
            """.trimIndent()

    override fun bindStatement(
        statement: Statement,
        casing: CasingEntity,
        auditor: String,
        auditTimestamp: LocalDateTime
    ) {
        statement.bind(0, Parameters.`in`(PostgresqlObjectId.UUID, casing.wellboreXid))
        statement.bind(1, Parameters.`in`(PostgresqlObjectId.VARCHAR, casing.sourceId))
        statement.bind(2, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.topMd))
        statement.bind(3, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.bottomMd))
        statement.bind(4, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.id))
        statement.bind(5, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.od))
        statement.bind(6, Parameters.`in`(PostgresqlObjectId.FLOAT8, casing.roughness))
        statement.bind(7, Parameters.`in`(PostgresqlObjectId.TIMESTAMP, casing.runDate))
        statement.bind(8, Parameters.`in`(PostgresqlObjectId.INT8, casing.version))
        statement.bind(9, Parameters.`in`(PostgresqlObjectId.VARCHAR, auditor))
        statement.bind(10, Parameters.`in`(PostgresqlObjectId.VARCHAR, auditor))
        statement.bind(11, Parameters.`in`(PostgresqlObjectId.TIMESTAMP, auditTimestamp))
        statement.bind(12, Parameters.`in`(PostgresqlObjectId.TIMESTAMP, auditTimestamp))
    }
}

In the sample code I found that this code is sending a batch of statements vs the more performant sending a list of tuples with a single statement.

After posting it looks like I should be able to bind the collection of parameters into the single statement. Thoughts?

On a follow up question: When looking at the bindStatement method I am trying to understand how to bind a List<Object[]> parameters.

mp911de commented 10 months ago

To what gets INSERT INTO time_series_history (time_series_tag_xid, timestamp, value) VALUES :tuples rewritten if you provide proper values?

chinshaw commented 10 months ago

To what gets INSERT INTO time_series_history (time_series_tag_xid, timestamp, value) VALUES :tuples rewritten if you provide proper values?

If I understand your question. The timeseries insert works perfectly fine unless there would be a null value parameter. For instance maybe there would be a timestep missing and we wanted to record the value as null ( not a valid scenario but for example purposes). The bind statement will delegate to the public PostgresqlStatement bind(int index, Object value) function which checks for null before invoking the bind. My hack was to check for a null value and have it invoke bindNull rather than bind.

I was looking and it may be possible to create a BindMarkerFactory to handle this more gracefully than modifying the NamedParameterUtils class.

chinshaw commented 10 months ago

I should have started with a simple example test.

create table nullable_test (
    date                        TIMESTAMP WITH TIME ZONE NOT NULL,
    nullable_value              VARCHAR(255),
    PRIMARY KEY (date, nullable_value)
)
@Table("nullable_test")
data class NullableTestEntity(

    // Required well  xid for the entity.
    @Column("date")
    val date: OffsetDateTime,
    @Column("nullable_value")
    val nullableValue: String? = null
)

interface NullableTestRepository : CoroutineCrudRepository<NullableTestEntity, UUID> {
    @Query(
        """
        INSERT INTO nullable_test (date, nullable_value)
                VALUES :tuples
        ON CONFLICT(date, nullable_value) DO UPDATE SET
            date = EXCLUDED.date,
            nullable_value = EXCLUDED.nullable_value
            RETURNING *
        """
    )
    suspend fun upsert(tuples: List<Array<Any?>>): Flow<NullableTestEntity>
}

@OptIn(ExperimentalCoroutinesApi::class)
@ExtendWith(SpringExtension::class)
@Import(ApiTestConfiguration::class)
@DataR2dbcTest(excludeFilters = [ComponentScan.Filter(value = [SecurityConfiguration::class])])
class NullableTupleTest : R2DBCPostgresTestContainer() {

    @Autowired
    private lateinit var nullableTestRepository: NullableTestRepository

    @Test
    fun tesSaveNotNull() = runTest {
        val entities = nullableTestRepository.upsert(
            listOf(
                arrayOf(OffsetDateTime.now(), "value1"),
                arrayOf(OffsetDateTime.now(), "value2")
            )
        )
        entities.onEach(::println)
        entities.collect()
    }

    @Test
    fun tesSaveNull() = runTest {
        val entities = nullableTestRepository.upsert(
            listOf(
                arrayOf(OffsetDateTime.now(), "value1"),
                arrayOf(OffsetDateTime.now(), null)
            )
        )
        entities.onEach(::println)
        entities.collect()
    }
}

In the first test everything works fine but in the second "testSaveNull" I get this exception. This seems to be an issue where bindNull should be invoked rather than bind.

java.lang.IllegalArgumentException: value must not be null
    at io.r2dbc.postgresql.util.Assert.requireNonNull(Assert.java:71)
    at io.r2dbc.postgresql.PostgresqlStatement.bind(PostgresqlStatement.java:104)
    at io.r2dbc.postgresql.PostgresqlStatement.bind(PostgresqlStatement.java:58)
    at org.springframework.r2dbc.core.DefaultDatabaseClient$StatementWrapper.bind(DefaultDatabaseClient.java:544)
    at org.springframework.r2dbc.core.binding.IndexedBindMarkers$IndexedBindMarker.bind(IndexedBindMarkers.java:86)
    at org.springframework.data.r2dbc.core.NamedParameterUtils$ExpandedQuery.bind(NamedParameterUtils.java:543)
    at org.springframework.data.r2dbc.core.NamedParameterUtils$ExpandedQuery.bind(NamedParameterUtils.java:522)
    at org.springframework.data.r2dbc.core.NamedParameterUtils$ExpandedQuery.bindTo(NamedParameterUtils.java:599)
    at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery$ExpandedQuery.bindTo(StringBasedR2dbcQuery.java:226)
    at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$execute$2(DefaultDatabaseClient.java:334)
    at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$execute$3(DefaultDatabaseClient.java:374)
    at org.springframework.r2dbc.core.ConnectionFunction.apply(ConnectionFunction.java:46)
    at org.springframework.r2dbc.core.ConnectionFunction.apply(ConnectionFunction.java:31)
    at org.springframework.r2dbc.core.DefaultFetchSpec.lambda$all$2(DefaultFetchSpec.java:88)
    at org.springframework.r2dbc.core.ConnectionFunction.apply(ConnectionFunction.java:46)
mp911de commented 10 months ago

Thanks for the detail.

Have you tried wrapping your nullable values in Parameters.in(…)?


            listOf(
                arrayOf(OffsetDateTime.now(), Parameters.in(R2dbcType.VARCHAR, "value1")),
                arrayOf(OffsetDateTime.now(), Parameters.in(R2dbcType.VARCHAR, null))
            )
chinshaw commented 10 months ago

Ah yes, that works and is a much cleaner solution. Thanks Mark!