ebean-orm / ebean

Ebean ORM
https://ebean.io
Apache License 2.0
1.46k stars 260 forks source link

How to use streaming queries in mysql #3468

Closed src-slu closed 1 week ago

src-slu commented 2 weeks ago

Environment

springboot: 3.1.1 ebean: 13.25.2 mysql: 8.0.28 clickhouse: 23.10.3.5

Expected behavior

mysql8.0 large data 800w to achieve streaming query. And useCursorFetch=true in the mysql url. In the same way, I tried clickhouse and the results were achievable.

Actual behavior

The jvm memory is set to 1024 MB and an oom occurs

Steps to reproduce

  // some java code
String sql = """
        select date_format(str_to_date(a.SUMM_DATE, '%Y%m%d'), '%Y-%m-%d') as summ_date,
               a.license_code,
               null as product_code,
               a.stock_amt,
               a.sale_amt,
               a.bal_amt,
               a.sale_mny
        frOm SAL_SALESDATA a
        """;

DB_MYSQL.findDto(SalSaleData.class, sql)
                .findEach(1000, DB_CH::insertAll);
  // some logging output
java.util.concurrent.CompletionException: jakarta.persistence.PersistenceException: java.sql.SQLException: Java heap space
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: jakarta.persistence.PersistenceException: java.sql.SQLException: Java heap space
        at io.ebeaninternal.server.transaction.ImplicitReadOnlyTransaction.commit(ImplicitReadOnlyTransaction.java:527)
        at io.ebeaninternal.server.core.AbstractSqlQueryRequest.endTransIfRequired(AbstractSqlQueryRequest.java:62)
        at io.ebeaninternal.server.core.DefaultServer.findDtoEach(DefaultServer.java:1543)
        at io.ebeaninternal.server.querydefn.DefaultDtoQuery.findEach(DefaultDtoQuery.java:107)
        at com.idataway.pro.app.sync.SyncRepairService.lambda$repairData$0(SyncRepairService.java:53)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
        ... 6 common frames omitted
Caused by: java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:807)
        at com.p6spy.engine.wrapper.ConnectionWrapper.commit(ConnectionWrapper.java:167)
        at io.ebean.datasource.pool.PooledConnection.commit(PooledConnection.java:558)
        at io.ebeaninternal.server.transaction.ImplicitReadOnlyTransaction.commit(ImplicitReadOnlyTransaction.java:525)
        ... 11 common frames omitted
Caused by: java.lang.OutOfMemoryError: Java heap space
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessageLocal(SimplePacketReader.java:133)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:102)
        at com.mysql.cj.protocol.a.SimplePacketReader.readMessage(SimplePacketReader.java:45)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:62)
        at com.mysql.cj.protocol.a.TimeTrackingPacketReader.readMessage(TimeTrackingPacketReader.java:41)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:66)
        at com.mysql.cj.protocol.a.MultiPacketReader.readMessage(MultiPacketReader.java:44)
        at com.mysql.cj.protocol.a.NativeProtocol.readMessage(NativeProtocol.java:515)
        at com.mysql.cj.protocol.a.ColumnDefinitionReader.read(ColumnDefinitionReader.java:72)
        at com.mysql.cj.protocol.a.ColumnDefinitionReader.read(ColumnDefinitionReader.java:40)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1587)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:68)
        at com.mysql.cj.protocol.a.TextResultsetReader.read(TextResultsetReader.java:48)
        at com.mysql.cj.protocol.a.NativeProtocol.read(NativeProtocol.java:1600)
        at com.mysql.cj.protocol.a.NativeProtocol.readAllResults(NativeProtocol.java:1654)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryPacket(NativeProtocol.java:1000)
        at com.mysql.cj.protocol.a.NativeProtocol.sendQueryString(NativeProtocol.java:933)
        at com.mysql.cj.NativeSession.execSQL(NativeSession.java:664)
        at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:795)
        at com.p6spy.engine.wrapper.ConnectionWrapper.commit(ConnectionWrapper.java:167)
        at io.ebean.datasource.pool.PooledConnection.commit(PooledConnection.java:558)
        at io.ebeaninternal.server.transaction.ImplicitReadOnlyTransaction.commit(ImplicitReadOnlyTransaction.java:525)
        at io.ebeaninternal.server.core.AbstractSqlQueryRequest.endTransIfRequired(AbstractSqlQueryRequest.java:62)
        at io.ebeaninternal.server.core.DefaultServer.findDtoEach(DefaultServer.java:1543)
        at io.ebeaninternal.server.querydefn.DefaultDtoQuery.findEach(DefaultDtoQuery.java:107)
        at com.idataway.pro.app.sync.SyncRepairService.lambda$repairData$0(SyncRepairService.java:53)
        at com.idataway.pro.app.sync.SyncRepairService$$Lambda$1521/0x0000000801b05358.run(Unknown Source)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
src-slu commented 2 weeks ago

I am wondering why the use of the stream, still can oom

rbygrave commented 2 weeks ago

p6spy

Remove that.

And useCursorFetch=true in the mysql url.

Note that Ebean will automatically deal with MySql particulars for streaming/iterator queries with ResultSet.TYPE_FORWARD_ONLY and ResultSet.CONCUR_READ_ONLY

src-slu commented 2 weeks ago

I found the setBufferFetchSizeHint method to use with findEach(batchsize, consumer), but oom did not appear. Is there any performance difference between using setBufferFetchSizeHint and using findEach(batchsize, consumer) directly? The p6spy as the sql collection component of the project, the production environment may be shut down.

src-slu commented 2 weeks ago
DB_MYSQL.findDto(Test.class, sql)
.setParameter("dateList", date)
.findEach(1000, DB_CH::insertAll)

datasource.mysql.driver=com.mysql.cj.jdbc.Driver datasource.mysql.url=jdbc:mysql://xxx? allowPublicKeyRetrieval=true&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true

The oom still appears.

java.util.concurrent.CompletionException: jakarta.persistence.PersistenceException: java.sql.SQLException: Java heap space
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.exec(CompletableFuture.java:1796)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: jakarta.persistence.PersistenceException: java.sql.SQLException: Java heap space
        at io.ebeaninternal.server.transaction.ImplicitReadOnlyTransaction.commit(ImplicitReadOnlyTransaction.java:527)
        at io.ebeaninternal.server.core.AbstractSqlQueryRequest.endTransIfRequired(AbstractSqlQueryRequest.java:62)
        at io.ebeaninternal.server.core.DefaultServer.findDtoEach(DefaultServer.java:1543)
        at io.ebeaninternal.server.querydefn.DefaultDtoQuery.findEach(DefaultDtoQuery.java:107)
        at com.idataway.pro.app.sync.SyncRepairService.lambda$repairData$0(SyncRepairService.java:55)
        at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804)
        ... 6 common frames omitted
Caused by: java.sql.SQLException: Java heap space
        at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
        at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
        at com.mysql.cj.jdbc.ConnectionImpl.commit(ConnectionImpl.java:807)
        at io.ebean.datasource.pool.PooledConnection.commit(PooledConnection.java:558)
        at io.ebeaninternal.server.transaction.ImplicitReadOnlyTransaction.commit(ImplicitReadOnlyTransaction.java:525)
        ... 11 common frames omitted
Caused by: java.lang.OutOfMemoryError: Java heap space
rbygrave commented 1 week ago

Did you find the root cause for this issue? Can you share what it was?

src-slu commented 1 week ago

Sorry, there have been too many logs recently. They've been cleared. Probably because I used the asynchronous thread CompletableFuture in my stream processing.

Thx!!!

rPraml commented 1 week ago

@jonasPoehler isn't that the same behaviour, that we have observed? AFAIR It seems, that the mariadb driver loads the whole result into the memory, when there is an (update?) query on the same transaction while reading a result.

@src-slu can you do a small test, if it makes any difference, if you try to avoid saves in findEach on the same transaction? Or even strip down your use case to raw jdbc. I'm relatively sure, that this might be a limitation of mariadb

src-slu commented 6 days ago

@rPraml I tried postgresql and clickhouse, in a large amount of data can be completed under the stream query. But switching to mysql will cause oom problems. Now I have changed my mind and this problem no longer occurs. I just restored the code to the previous version just now, and this problem no longer occurs. Now I can't restore it.

src-slu commented 6 days ago

I tried to use jdbc stream + batch method for data synchronization, which took 221s. But using ebean’s stream+batch method takes more than 2 hours. So, I think when I used ebean, I didn’t really use batch.

java code
@Transactional(batchSize = 500, getGeneratedKeys = false, batch = PersistBatch.ALL)
public void repairData(SyncRepairDto dto) {
    // todo something
            DB_MYSQL.findDto(table.getClz(), table.getSql().get())
                    .setParameter("dateList", date)
                    // 流式查询处理数据
                    .findEach(1000, DB::insertAll);
}
src-slu commented 5 days ago

@jonasPoehler isn't that the same behaviour, that we have observed? AFAIR It seems, that the mariadb driver loads the whole result into the memory, when there is an (update?) query on the same transaction while reading a result.

@src-slu can you do a small test, if it makes any difference, if you try to avoid saves in findEach on the same transaction? Or even strip down your use case to raw jdbc. I'm relatively sure, that this might be a limitation of mariadb

On the linux server, this problem occurs. It doesn't exist locally, but it will look like this when I put it on the server.

jdk versions
Linux:OpenJDK Runtime Environment Temurin-17.0.7+7 (build 17.0.7+7)
Local(Windows):OpenJDK Runtime Environment Temurin-17.0.10+7 (build 17.0.10+7)
java code
DB_MYSQL.sqlQuery(table.getSql().get())
                        .setParameter("dateList", date)
                        .findEach(sqlRow -> {//todo })
src-slu commented 5 days ago

image This is the memory footprint using jdbc. image This is the memory footprint using ebean. Memory soars when just entering foreach. The foreach process in the middle is the same.

java code -jdbc
ResultSet rs = null;
                try {
                    rs = stmt.executeQuery();
                    while (rs.next()) {
                         // todo
                     }

java code -ebean
DB_MYSQL.sqlQuery(sql)
                        .setParameter("dateList", date)
                        .findEach(sqlRow -> {//todo })

Did I not use ebean’s foreach correctly? Regarding ebean’s solution, there are not many examples on the Internet. I can only follow the simple cases in the api documentation and ebean official documentation as a reference.

@rbygrave