ebean-orm / ebean

Ebean ORM
https://ebean.io
Apache License 2.0
1.47k stars 260 forks source link

ConcurrentModificationException in combination with Kotlin Coroutines and relation.fetch() #3141

Open davidhiendl opened 1 year ago

davidhiendl commented 1 year ago

There appears to be an issue with Kotlin Coroutines and loading of Relationships that are not loaded via join. Sometimes when accessing a relationship (in this example, .pendingActions) from multiple coroutines on different entities a ConcurrentModificationException triggers when the relationship is initialized.

In testing I found a easy workaround for this problem, simply touching the relationship before parallel processing the entities with coroutines prevents the issue.

If further information is required I'd be happy to provide it.

Edit: Ebean version 13.20.1

Stacktrace from actual project:

java.util.ConcurrentModificationException: null
        at java.base/java.util.ArrayList$Itr.checkForComodification(ArrayList.java:1013)
        at java.base/java.util.ArrayList$Itr.next(ArrayList.java:967)
        at de.dhswt.bsmxcd.erpconnectors.ottomp.services.ProductSyncService.processProductMapping(ProductSyncService.kt:404)
        at de.dhswt.bsmxcd.erpconnectors.ottomp.services.ProductSyncService.processProductMapping$default(ProductSyncService.kt:380)
        at de.dhswt.bsmxcd.erpconnectors.ottomp.services.ProductSyncService$processChanged$3$1$1.invokeSuspend(ProductSyncService.kt:864)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Steps to reproduce

This is a dumbed-down example taken from actual project code.

@Entity
@Table(name = "erpconn_example_product_mapping")
class ExampleProductMapping : TemporalAwareTenantDomainModel() {

    // ... irrelevant fields omitted
    @OneToMany(cascade = [CascadeType.REMOVE], fetch = FetchType.LAZY)
    val pendingActions: MutableList<ExamplePendingAction> = mutableListOf()

    override fun toString(): String {
        return "ExampleProductMapping(id=$id, product.id=${product?.id}, connector.id=${connector?.id}, lastChecked=$lastChecked, lastUpdated=$lastUpdated)"
    }

}
    suspend fun example() {
        val page = QExampleProductMapping(db)
            .pendingActions.fetch()
            .pendingActions.filterMany(
                QExamplePendingAction()
                    .status.eq(ExamplePendingAction.PendingActionStatus.PENDING)
                    .expressionList
            )
            .setMaxRows(100)
            .findPagedList()

        coroutineScope {
            for (productMapping in page.list) {
                launch(connector.coroutinePool) { // EDIT: this line launches multiple parallel coroutines that process the entities, up to the pools maximum size
                    for(pendingAction in productMapping.pendingActions) { // <-- iterator sometimes triggers ConcurrentModificationException
                        // ... 
                    }
                }
            }
        }
    }

Adding a simple "touch" operation before splitting the work with coroutines prevents the error from occurring, as relations are loaded.

    suspend fun exampleWorkaround() {
        val page = QExampleProductMapping(db)
            .pendingActions.fetch()
            .pendingActions.filterMany(
                QExamplePendingAction()
                    .status.eq(ExamplePendingAction.PendingActionStatus.PENDING)
                    .expressionList
            )
            .setMaxRows(100)
            .findPagedList()

        // adding these 3 lines "fixed"/circumvented the problem, by triggering the loading before parallel processing of the list with coroutines
        for (productMapping in page.list) {
            productMapping.pendingActions.first()
        }

        coroutineScope {
            for (productMapping in page.list) {
                launch(connector.coroutinePool) {
                    for(pendingAction in productMapping.pendingActions) { // <-- no longer triggers exception
                        // ...
                    }
                }
            }
        }
    }
rbygrave commented 1 year ago

With the .pendingActions.fetch() we expect those to be fetched eagerly. From the above code that looks like it is not happening. Have to looked at the SQL produced to confirm that the "secondary query" that fetches the pending actions is not executed (unless you add that "fix" code in there) ?

The above uses findPagedList() but then doesn't use the total count. Is that because it is omitted to simplify the test? (Given the above findList() would work so it isn't clear why it isn't used).

davidhiendl commented 1 year ago

The call to findPagedList is because of the simplification. In that specific case it did process all pages (see function below).

The eager-fetching of pendingActions occurs right after the query is executed in a second query, before the "workaround" code is actually called.

This is an example log of the queries executed. It appears that all relations that are marked as eager are fetched, this happens before any of the entities are actually access, the first call in the page loop is the call to the workaround.

13:54:54.951 [Test worker @coroutine#1         ] DEBUG - io.ebean.SQL                         - txn[] select distinct t0.id, t0.last_checked, t0.last_updated, t0.enable, t0.product_version, t0.pricing_member_last_updated_version, t0.example_sku, t0.example_enable, t0.example_exists, t0.data_status, t0.example_quantity, t0.version, t0.modified_at, t0.created_at, t0.connector_id, t2.id, t2.scoped_attributes, t2.global_attributes, t2.version, t2.modified_at, t2.created_at, t2.parent_product_id, t1.id, t1.saleable_quantity, t1.reserved_quantity, t1.total_quantity, t1.version, t1.modified_at, t1.created_at from erpconn_examplemp_product_mapping t0 left join catalog_product t2 on t2.id = t0.product_id left join inventory_channel_product t1 on t1.id = t0.inventory_channel_product_id left join erpconn_examplemp_pending_action u1 on u1.product_mapping_id = t0.id left join pricing_member t3 on t3.product_id = t2.id where t0.connector_id = ? and (u1.status = ? or t0.enable != t0.example_enable or (t0.enable = ? and t0.example_exists = ?) or (t0.example_exists = ? and (FLOOR(t1.saleable_quantity) != t0.example_quantity or t0.pricing_member_last_updated_version is null or t0.pricing_member_last_updated_version != t3.last_updated_version or t0.product_version is null or t0.product_version != t2.version))) limit 100; --bind(38,PENDING,true,false,true) --micros(100,756)
13:54:54.983 [Test worker @coroutine#1         ] DEBUG - io.ebean.SQL                         - txn[] select t0.product_mapping_id, t0.id, t0.last_checked, t0.process_uuid, t0.status, t0.process_type, t0.update_reason, t0.version, t0.modified_at, t0.created_at, t0.product_mapping_id, t0.connector_id from erpconn_examplemp_pending_action t0 where (t0.product_mapping_id) = any(?) and t0.status = ?; --bind(Array[3]={4,7,18},PENDING) --micros(23,476)
13:54:55.006 [Test worker @coroutine#1         ] DEBUG - io.ebean.SQL                         - txn[] select t0.product_id, t0.id, t0.active, t0.identifier, t0.offer_url, t0.last_checked, t0.last_updated, t0.last_updated_version, t0.margin_min, t0.margin_def, t0.margin_max, t0.algorithm_price_mode, t0.constrain_upper_bound_msrp, t0.constrain_lower_bound_msrp_percent, t0.engine_state, t0.version, t0.modified_at, t0.created_at, t0.program_id, t0.product_id, t1.id, t1.valid_from, t1.valid_until, t1.current, t1.status, t1.status_message, t1.positions, t1.desired_position, t1.current_position, t1.buying_price, t1.msr_price, t1.last_price_change, t1.margin_min, t1.margin_def, t1.margin_max, t1.constrain_upper_bound_msrp, t1.constrain_lower_bound_msrp_percent, t1.algorithm_price_mode, t1.price_min, t1.price_def, t1.price_max, t1.price_cur, t1.version, t1.modified_at, t1.created_at, t1.member_id from pricing_member t0 left join pricing_history t1 on t1.member_id = t0.id where (t0.product_id) = any(?) and t0.program_id = ? and t1.current = ? order by t0.id; --bind(Array[3]={14,3,5},2true) --micros(22,195)
13:54:55.026 [Test worker @coroutine#1         ] TRACE - d.d.b.e.o.s.ProductSyncService       - workaroundTriggerCollectionLoad

I'm fairly certain this is caused by the logic that actually attaches the relations to the entities on-read not being thread-safe or some similar problem, by removing co-routines from the equation the problem goes away immediately. I'm not sure if its reproducible with regular threads yet.

I plan to dig into that further soon. I think I noticed that this problem occurred after some recent Ebean/Kotlin updates but I cannot pinpoint which yet unfortunately, but again not 100% sure if it is related.

Unfortunately I've never been able to re-produce the problem with the debugger attached, yet.

inline fun <T, R, TQ : TQRootBean<T, R>> TQ.eachPage(block: (page: PagedList<T>) -> Unit) {
    var offset = 0
    do {
        this.setFirstRow(offset)
        val page = this.findPagedList()
        block(page)
        offset += page.pageSize
    } while (page.hasNext())
}
davidhiendl commented 1 year ago

I also just found another issue related very closely to this that only occurs with coroutines in the mix:

When fetching a "chained" association .product.pricingMembers.histories and adding filterMany conditions the resulting objects do not honor the conditions and there are additional queries in the log that don't have those filter conditions (they appear to be single-entity lazy-loading queries. When I add these access paths to the workaround function it also does not happen.

QExampleProductMapping(db)
.connector.eq(connector.config)
.product.fetch()
.pendingActions.fetch()
.pendingActions.filterMany(
    QExamplePendingAction()
        .status.eq(ExamplePendingAction.ExamplePendingActionStatus.PENDING)
        .expressionList
)
.inventoryChannelProduct.fetch()
.alsoIf(connector.engineConfig.pricingProgram != null) { q ->
    q.product.pricingMembers.fetch()
    q.product.pricingMembers.filterMany(
        QPricingMember()
            .program.id.eq(connector.engineConfig.pricingProgram)
            .expressionList
    )

    q.product.pricingMembers.histories.fetch()
    q.product.pricingMembers.histories.filterMany(
        QPricingHistory()
            .current.isTrue
            .expressionList
    )
}
private fun workaroundTriggerCollectionLoad(list: MutableList<ExampleProductMapping>) {
    logger.trace { "workaroundTriggerCollectionLoad" }
    for (mapping in list) {
        mapping.inventoryChannelProduct?.id
        mapping.pendingActions.firstOrNull()
        mapping.product
            ?.pricingMembers?.find { it.program?.id == connector.engineConfig.pricingProgram }
            ?.histories?.find { it.current }
    }
}
rob-bygrave commented 1 year ago

.alsoIf(connector.engineConfig.pricingProgram != null) { q ->

That looks handy, maybe we should add that to query beans ...

When I add these access paths to the workaround function it also does not happen.

And this all works fine without Kotlin Coroutines?

davidhiendl commented 1 year ago

@rob-bygrave If you limit the amount of threads that process entities to 1 it works fine even with Coroutines.

Also that function is just a Kotlin extension, I've got a ton of them for various common stuff, even some Ebean specific ones. If you are interested I can create a PR for them.

@OptIn(ExperimentalContracts::class)
public inline fun <T> T.alsoIf(condition: Boolean, block: (T) -> Unit): T {
    contract {
        callsInPlace(block, InvocationKind.AT_MOST_ONCE)
    }
    if (condition) block(this)
    return this
}
rbygrave commented 1 year ago

.alsoIf() ... just a Kotlin extension ... even some Ebean specific ones.

Yes, I'd look to include a .alsoIf() on the query bean root type that can be used by all (Java + Kotlin etc).

just a Kotlin extension ... even some Ebean specific ones.

Yes I'm keen to see any Ebean specific Kotlin extensions even if we only end up documenting them. There was a ebean-kotlin project that was for the purpose of sharing kotlin extensions but it didn't get any PRs.

alsoIf() and Ebean queries

As I see it, the fluid style of ebean query beans is a real strength. At the same time I think it's a lot less nice when we can't maintain that fluid style. We have eqIfPresent(), gtIfPresent() ... [the *IfPresent expressions] plus inOrEmpty() in order to help us maintain that fluid style.

Adding a alsoIf( <predicate>, <consumer>) would be a nice addition to help that fluid style.

.filterMany()

So seeing the filterMany() like this ...

.filterMany( QPricingHistory() ... .expressionList )

... pretty sure we can provide this via query beans with a closure without needing to new up the query and return the expressionList so I think there is a nice little improvement we can make here too.

rbygrave commented 1 year ago

FYI: with https://github.com/ebean-orm/ebean/pull/3169 instead of

    .pendingActions.filterMany(
                QExamplePendingAction()
                    .status.eq(ExamplePendingAction.PendingActionStatus.PENDING)
                    .expressionList
    )

We can do

    .pendingActions.filterMany( pa -> pa
                   .status.eq(ExamplePendingAction.PendingActionStatus.PENDING)
    )

... effectively the QExamplePendingAction() is created for us and the .expressionList is extracted for us.

davidhiendl commented 1 year ago

That should allow for more fluent-style query building.

Any idea what might be causing the concurrency issue yet? I've been digging deeper into ebean code to understand exactly how the lazy field initialization occurs but so far I'm not sure yet why this is happening.

As a workaround I've rewritten some of our queries and processing logic to fetch smaller pages and process a page per-thread but multiple pages in parallel, not ideal but it works for now. I would still prefer to find and patch the original cause.

rbygrave commented 1 year ago

Any idea what might be causing the concurrency issue yet?

No but I haven't looked at it.

Are you able to provide a failing test case? Create an example project in say GitHub that people can run and reproduce the issue?