j256 / ormlite-android

ORMLite Android functionality used in conjunction with ormlite-core
http://ormlite.com/
ISC License
1.59k stars 367 forks source link

Multi-threading issue with RxJava and ORMlite #80

Closed 4gus71n closed 2 years ago

4gus71n commented 7 years ago

Hi,

I'm from a team of developers and we are currently using ORMlite to implement our database layer in our App. We are running on some issues because we have two different methods, from two different database interactors that are using the same DAO, those methods are wrapped on a rx.Observable, so they execute asynchronously. One of our database inteactors is call FeedCache and the other one is call SchedulesCache. In some part of our code we are executing the following code:

mSchedulesCache.saveSchedule(someSchedule);
mFeedCache.saveFeed(someSchedule);

Each method implementation looks like this:

    @Override
    public void saveFeed(final String playerId, final BaseFeedItem feed) {
        Log.i(TAG, "DBFeedCache#saveFeed method executed");
        Log.i(TAG, "DBFeedCache#saveFeed the feed will be associated with: " + playerId);
        Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(final Subscriber<? super Object> subscriber) {
                try {
                    getHelper().getFeedDao().callBatchTasks(new Callable<Void>() {
                        @Override
                        public Void call() throws Exception {
                            long startTime = DateHelper.now().getTime();

                            saveFeedSql(feed, playerId);

                            Log.i(TAG, "DBFeedCache#saveFeed inserted feeds: " + feed.toString());
                            Log.i(TAG, "DBFeedCache#saveFeed time: " + (DateHelper.now().getTime() - startTime));
                            subscriber.onCompleted();
                            return null;
                        }
                    });
                } catch (Exception e) {
                    handleException(e);
                    subscriber.onCompleted();
                }
            }
        }).subscribeOn(Schedulers.io()).subscribe();
    }
    @Override
    public void saveSchedule(final String playerId,final Event eventToInsert) {
        Log.i(TAG, "DBPlayerCache#saveSchedule method executed");
        Observable.create(new Observable.OnSubscribe<Object>() {
            @Override
            public void call(final Subscriber<? super Object> subscriber) {
                long startTime = DateHelper.now().getTime();
                try {
                    saveScheduleSql(playerId, eventToInsert);

                    Log.i(TAG, "DBSchedulesCache#saveSchedule time:" + (DateHelper.now().getTime() - startTime));
                    subscriber.onNext(null);
                } catch (SQLException e) {
                    handleException(e);
                    subscriber.onError(e);
                } finally {
                    subscriber.onCompleted();
                }
            }
        }).subscribeOn(Schedulers.computation()).subscribe();
    }

The thing is that internally in those *Sql methods we end up calling this DAO getHelper().getEventDao().createOrUpdate(dbEventDTO); and that causes this exception to be raised:

java.lang.IllegalStateException: Exception thrown on Scheduler.Worker thread. Add `onError` handling.
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:57)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423)
   at java.util.concurrent.FutureTask.run(FutureTask.java:237)
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
   at java.lang.Thread.run(Thread.java:818)
Caused by: rx.exceptions.OnErrorNotImplementedException: Unable to run insert stmt on object com.siplay.android_siplay.data.db.DbEventDTO@ebc3a75: INSERT INTO `dbeventdto` (`mAddress` ,`player_id` ,`mAwayRgb1` ,`mAwayRgb2` ,`mAwayScore` ,`mAwayTeam` ,`mAwayTeamId` ,`mAwayTeamLogo` ,`mCity` ,`mCountry` ,`mCustomTeamName` ,`mDescription` ,`mEndLocal` ,`mEndTs` ,`mEndUtc` ,`mHomeRgb1` ,`mHomeRgb2` ,`mHomeScore` ,`mHomeTeam` ,`mHomeTeamId` ,`mHomeTeamLogo` ,`mLat` ,`mLocResourceId` ,`mLocation` ,`mLon` ,`mName` ,`mOrigin` ,`id` ,`mStartLocal` ,`mStartTs` ,`mStartUtc` ,`mState` ,`mStatus` ,`mTeamId` ,`mTrackAttendance` ,`mType` ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
   at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:386)
   at rx.internal.util.InternalObservableUtils$ErrorNotImplementedAction.call(InternalObservableUtils.java:383)
   at rx.internal.util.ActionSubscriber.onError(ActionSubscriber.java:44)
   at rx.observers.SafeSubscriber._onError(SafeSubscriber.java:152)
   at rx.observers.SafeSubscriber.onError(SafeSubscriber.java:115)
   at rx.internal.operators.OperatorSubscribeOn$1$1.onError(OperatorSubscribeOn.java:59)
   at com.siplay.android_siplay.data.cache.db.DBSchedulesCache$1.call(DBSchedulesCache.java:50)
   at com.siplay.android_siplay.data.cache.db.DBSchedulesCache$1.call(DBSchedulesCache.java:39)
   at rx.Observable.unsafeSubscribe(Observable.java:10150)
   at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94)
   at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228)
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423) 
   at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) 
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
   at java.lang.Thread.run(Thread.java:818) 
Caused by: java.sql.SQLException: Unable to run insert stmt on object com.siplay.android_siplay.data.db.DbEventDTO@ebc3a75: INSERT INTO `dbeventdto` (`mAddress` ,`player_id` ,`mAwayRgb1` ,`mAwayRgb2` ,`mAwayScore` ,`mAwayTeam` ,`mAwayTeamId` ,`mAwayTeamLogo` ,`mCity` ,`mCountry` ,`mCustomTeamName` ,`mDescription` ,`mEndLocal` ,`mEndTs` ,`mEndUtc` ,`mHomeRgb1` ,`mHomeRgb2` ,`mHomeScore` ,`mHomeTeam` ,`mHomeTeamId` ,`mHomeTeamLogo` ,`mLat` ,`mLocResourceId` ,`mLocation` ,`mLon` ,`mName` ,`mOrigin` ,`id` ,`mStartLocal` ,`mStartTs` ,`mStartUtc` ,`mState` ,`mStatus` ,`mTeamId` ,`mTrackAttendance` ,`mType` ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
   at com.j256.ormlite.misc.SqlExceptionUtil.create(SqlExceptionUtil.java:25)
   at com.j256.ormlite.stmt.mapped.MappedCreate.insert(MappedCreate.java:137)
   at com.j256.ormlite.stmt.StatementExecutor.create(StatementExecutor.java:458)
   at com.j256.ormlite.dao.BaseDaoImpl.create(BaseDaoImpl.java:328)
   at com.j256.ormlite.dao.BaseDaoImpl.createOrUpdate(BaseDaoImpl.java:387)
   at com.siplay.android_siplay.data.cache.db.DBSchedulesCache.saveScheduleSql(DBSchedulesCache.java:69)
   at com.siplay.android_siplay.data.cache.db.DBSchedulesCache$1.call(DBSchedulesCache.java:44)
   at com.siplay.android_siplay.data.cache.db.DBSchedulesCache$1.call(DBSchedulesCache.java:39) 
   at rx.Observable.unsafeSubscribe(Observable.java:10150) 
   at rx.internal.operators.OperatorSubscribeOn$1.call(OperatorSubscribeOn.java:94) 
   at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:228) 
   at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) 
   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:423) 
   at java.util.concurrent.FutureTask.run(FutureTask.java:237) 
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) 
   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) 
   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) 
   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) 
   at java.lang.Thread.run(Thread.java:818) 
Caused by: java.sql.SQLException: inserting to database failed: INSERT INTO `dbeventdto` (`mAddress` ,`player_id` ,`mAwayRgb1` ,`mAwayRgb2` ,`mAwayScore` ,`mAwayTeam` ,`mAwayTeamId` ,`mAwayTeamLogo` ,`mCity` ,`...

If we avoid execute the line mSchedulesCache.saveSchedule(someSchedule); everything goes fine. The thing is that we want to be able to execute those two lines without problem. We know that the cause of this issue must be the rx.Observable's threads messing with the database transactions. Is there any solution to this? Maybe injecting the same thread for both rx.Observables? Has anyone experienced something like this?

Update:

I tried something like this:

public class SynchronizedDao<T, ID> extends BaseDaoImpl<T, ID> {
        public SynchronizedDao(Class<T> dataClass) throws SQLException {
            super(dataClass);
        }

        public SynchronizedDao(ConnectionSource connectionSource, Class<T> dataClass) throws SQLException {
            super(connectionSource, dataClass);
        }

        public SynchronizedDao(ConnectionSource connectionSource, DatabaseTableConfig<T> tableConfig) throws SQLException {
            super(connectionSource, tableConfig);
        }

        @Override
        public synchronized CreateOrUpdateStatus createOrUpdate(T data) throws SQLException {
            return super.createOrUpdate(data);
        }
    }
...
public Dao<DbEventDTO, Integer> getEventDao() throws SQLException {
        if (eventDao == null) {
            eventDao = new SynchronizedDao(connectionSource, DbEventDTO.class);
        }
        return eventDao;
    }
...

So the Dao object can be access by one object at the time and we avoid the issue, but what I'm getting now is this warning:

03-09 19:04:05.091 7642-7892/com.siplay.android_siplay W/SQLiteConnectionPool: The connection pool for database '+data+user+0+com_siplay_android_siplay+databases+sip_db' has been unable to grant a connection to thread 115842 (RxIoScheduler-5) with flags 0x1 for 16.002 seconds.
                                                                               Connections: 0 active, 1 idle, 0 available.
03-09 19:04:05.091 7642-7815/com.siplay.android_siplay W/SQLiteConnectionPool: The connection pool for database '+data+user+0+com_siplay_android_siplay+databases+sip_db' has been unable to grant a connection to thread 115812 (RxIoScheduler-2) with flags 0x1 for 16.001001 seconds.
                                                                               Connections: 0 active, 1 idle, 0 available.
03-09 19:04:05.421 7642-7862/com.siplay.android_siplay W/SQLiteConnectionPool: The connection pool for database '+data+user+0+com_siplay_android_siplay+databases+sip_db' has been unable to grant a connection to thread 115830 (RxIoScheduler-4) with flags 0x1 for 12.001 seconds.
                                                                               Connections: 0 active, 1 idle, 0 available.

Seems to save the data in the database fine. Anyone know why is throwing this warning?

afeozzz commented 7 years ago

@4gus71n Did you find the solution ?

4gus71n commented 7 years ago

@afeozzz What I did so far was providing a single thread Scheduler from my App module. I didn't push a release version with that change yet and I didn't see any change on the log. So I don't know if its 100% fixed.

j256 commented 2 years ago

Super old issue however in 5.1 we added synchronized keyword on the BaseDaoImpl.createOrUpdate() and createIfNotExists() methods. Hopefully that will protect against similar race conditions.