nhl / link-move

A model-driven dynamically-configurable framework to acquire data from external sources and save it to your database.
Apache License 2.0
35 stars 15 forks source link

JDBC-level pagination/streaming of large queries #142

Open andrus opened 6 years ago

andrus commented 6 years ago

When reading large datasets (e.g. inside CreateOrUpdateTask.getRowReader() / JdbcExtractor.getReader()), we are using Cayenne iterator, still the underlying DB may read the entire ResultSet in memory. This causes two problems:

A Cayenne side of this should use SQLSelect.statementFetchSize(batchSize). But some DBs may require extra settings at the JDBC level for this flag to take effect. Specifically MySQL requires this:

stmt = conn.createStatement(
              ResultSet.TYPE_FORWARD_ONLY,
              ResultSet.CONCUR_READ_ONLY);

So the goal of this task is to either figure out a transparent solution for JDBC-level result streaming or develop a set of simple recipes for MySQL, PostgreSQL, SQLServer as most common DB engines on how to solve it (e.g. via URL parameters, etc.) Or provide some combination of the two. (See a MySQL recipe below in comments).

andrus commented 6 years ago

On MySQL the magic sauce seems to be useCursorFetch=true&defaultFetchSize=500 parameters added to the URL that switches the ResultSet to the batch mode. Here are the before and after memory profiles:

image

screen shot 2018-02-08 at 5 27 39 pm
vitalz commented 4 years ago

I experienced connectivity timeouts for Delete task (imagine, there was a quite slow stage listener causing data have been processed about 2-3 hours). Though useCursorFetch=true&defaultFetchSize=500 params were on JDBC URL. Per a log I noted that failing it complained on the first query which selected all targets: com.nhl.link.move.runtime.task.delete.DeleteTask.createTargetSelect()

vitalz commented 4 years ago

Changed behaviour in decorated DeleteTask resolves a connection timeout issue: when com.nhl.link.move.runtime.task.delete.DeleteTask.createTargetSelect() returns com.nhl.link.move.runtime.task.delete.CollectionResultIterator as org.apache.cayenne.ResultIterator then job result is successful.

Otherwise it might fail fail like this way with MySQL & HikariCP:

[04/Jan/2020:06:44:09,864] bootique-job-1 KOaLIhs-80000000 ? WARN  c.z.h.p.ProxyConnection: <datasourceName> - Connection com.mysql.jdbc.JDBC4Connection@447be6c4 marked as broken because of SQLSTATE(08007), ErrorCode(0)
com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: Communications link failure during commit(). Transaction resolution unknown.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:422)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
    at com.mysql.jdbc.Util.getInstance(Util.java:387)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:898)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:887)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:862)
    at com.mysql.jdbc.ConnectionImpl.commit(ConnectionImpl.java:1621)
    at com.zaxxer.hikari.pool.ProxyConnection.commit(ProxyConnection.java:368)
    at com.zaxxer.hikari.pool.HikariProxyConnection.commit(HikariProxyConnection.java)
    at org.apache.cayenne.tx.TransactionConnectionDecorator.commit(TransactionConnectionDecorator.java:76)
    at org.apache.cayenne.tx.CayenneTransaction.processCommit(CayenneTransaction.java:84)
    at org.apache.cayenne.tx.BaseTransaction.commit(BaseTransaction.java:149)
    at org.apache.cayenne.access.TransactionResultIteratorDecorator.close(TransactionResultIteratorDecorator.java:68)
    at org.apache.cayenne.access.DataContext$DataRowResultIterator.close(DataContext.java:1247)
    at com.nhl.link.move.runtime.task.delete.DeleteTask.run(DeleteTask.java:60)
andrus commented 4 years ago

@vitalz : could you post your code changes that help to address the problem?

vitalz commented 4 years ago
package com.nhl.link.move.runtime.task.delete;

import java.lang.reflect.Field;
import java.util.Map;

import org.apache.cayenne.DataObject;
import org.apache.cayenne.ResultIterator;
import org.apache.cayenne.query.ObjectSelect;
import org.apache.commons.lang.NotImplementedException;

import com.nhl.link.move.batch.BatchProcessor;
import com.nhl.link.move.batch.BatchRunner;
import com.nhl.link.move.runtime.cayenne.ITargetCayenneService;
import com.nhl.link.move.Execution;
import com.nhl.link.move.SyncToken;

public final class DeleteTaskDecorator<T extends DataObject> extends DeleteTask<T> {
    private final DeleteTask<T> task;

    public DeleteTaskDecorator(DeleteTask<T> task) {
        super(null, new Integer(0), null, null, null, null, null);
        this.task = task;
    }

    @Override
    public Execution run() {
        throw new NotImplementedException(); // fail fast
    };

    @Override
    public Execution run(@SuppressWarnings("rawtypes") Map params) {

        try (
            @SuppressWarnings("unchecked")
            Execution execution = new Execution("DeleteTask:" + extractorName, params);) {

            BatchProcessor<T> batchProcessor = this.createBatchProcessor(execution); 
            ResultIterator<T> data = this.createTargetSelect();

            try {
                BatchRunner.create(batchProcessor).withBatchSize(this.task.batchSize).run(data);
            } finally {
                data.close();
            }

            return execution;
        }
    }

    @Override
    public Execution run(SyncToken token) {
        throw new NotImplementedException(); // fail fast
    }

    @Override
    public Execution run(SyncToken token, Map<String, ?> params) {
        throw new NotImplementedException(); // fail fast
    }

    private ITargetCayenneService getTargetCayenneService() throws RuntimeException {

        ITargetCayenneService targetCayenneService = null;

        try {
            Field f = this.task.getClass().getDeclaredField("targetCayenneService");
            f.setAccessible(true);
            targetCayenneService = (ITargetCayenneService) f.get(this.task);
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }

        return targetCayenneService;
    }

    @Override
    protected ResultIterator<T> createTargetSelect() {
        ObjectSelect<T> query = ObjectSelect.query(this.task.type).where(this.task.targetFilter);
        return new CollectionResultIterator<>(query.select(getTargetCayenneService().newContext()));
    };

    @Override
    protected BatchProcessor<T> createBatchProcessor(Execution execution) {
        return this.task.createBatchProcessor(execution);
    };

}
result = new DeleteTaskDecorator<MyEntity>(
      (DeleteTask<MyEntity>)
        lmRuntime.service(ITaskService.class)
        .delete(MyEntity.class)
        .sourceMatchExtractor(EXTRACTOR_PATH)
        .stageListener(processorService.processor(MyEntity.class, EXTRACTOR_PATH).failOnErrors().build())
        .matchBy(new CustomMapper())
        .batchSize(10000) // what limit should be there?
        .task()
    )
    .run(parameters)
    .createReport();