jillesvangurp / kt-search

Multi platform kotlin client for Elasticsearch & Opensearch with easily extendable Kotlin DSLs for queries, mappings, bulk, and more.
MIT License
95 stars 23 forks source link

[BUG] ktsearch.RestException doesn't seem to have a way of being handled for bulk requests #92

Closed sumdog closed 7 months ago

sumdog commented 7 months ago

Describe the bug

I'm trying to trade out some individual calls into bulk request calls. I had a try/catch for rest exceptions, but am not sure how it would work for bulk calls. Right now I'm seeing the following

Exception in thread "main" com.jillesvangurp.ktsearch.RestException: RequestIsWrong 400: {"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"failed to parse field [status.poll.expires_at] of type [date] in document with id '28924386'. Preview of field's value: '11878-05-15T12:38:10.000Z'"}],"type":"mapper_parsing_exception","reason":"failed to parse field [status.poll.expires_at] of type [date] in document with id '28924386'. Preview of field's value: '11878-05-15T12:38:10.000Z'","caused_by":{"type":"illegal_argument_exception","reason":"failed to parse date field [11878-05-15T12:38:10.000Z] with format [strict_date_optional_time||epoch_millis]","caused_by":{"type":"date_time_parse_exception","reason":"date_time_parse_exception: Failed to parse with all enclosed parsers"}}},"status":400}
        at com.jillesvangurp.ktsearch.RestclientKt.asResult(Restclient.kt:96)
        at com.jillesvangurp.ktsearch.Request_dslKt.post(request-dsl.kt:95)
        at com.jillesvangurp.ktsearch.Request_dslKt$post$1.invokeSuspend(request-dsl.kt)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
        at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
        at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
        at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
        at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
        at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
        at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)

and my batch code looks like the following:

        stmt.executeQuery().use { resSet ->
            resSet.use { res ->
                client.bulk(callBack = itemCallBack, bulkSize = 10000) {
                    while (res.next()) {
                        val instanceName = res.getString("instance_name")
                        val retrievedAt = res.getTimestamp("retrieved_at")
                        val jsonString = res.getString("json_data")
                        val id = res.getLong("id")
                        val jsonData = Json.decodeFromString<JsonObject>(jsonString)

                        val model = StatusRecord(id=id.toString(), metaData = Metadata(instanceName, retrievedAt.toInstant().toKotlinInstant()), status = jsonData)

                        client.indexDocument(target = STAT_INDEX_NAME, id = id.toString(), document = model)
                    }
                }

I specify a callBack but they don't seem to be called for exceptions. I could put it around the bulk call, but then I'd basically lose the entire batch, not just the invalid record (like I would if I ran the indexDocument statements individually outside of a batch. Now that I look at it, shouldn't OpenSearch send back an individual error instead of raising a rest exception?

I'm not sure if there is a solution to this now that I think about it.

Versions:

OpenSearch 2.9.0 kt-search: com.jillesvangurp:search-client:2.1.1 Java: openjdk 17.0.8.1 OS: Linux

jillesvangurp commented 7 months ago

Thanks for reporting, I'll have to look into that. I haven't run into this myself yet but I could see this happening to me as well.

Probably, adding some kind of rest error response may help. It's weird that it fails the entire request though. It should just fail the item.

jillesvangurp commented 7 months ago

Actually, just looking at your code you are not doing a bulk request but a regular document index requests on the client inside a bulk session. You should use index(...) (this is the BulkSession). It will still fail for the same reason but should be using the callback for the failed items and not fail other items.

sumdog commented 7 months ago

Ah yep, that was it. I should have read the docs more carefully! 😅 Thanks!