Open lucasjinreal opened 2 months ago
SQLite itself can't do concurrent writes. It's not a limitation of drift.
However the issue at hand is that the main thread is getting locked up. Drift tries to do most of the work on another thread, however some of the work is done on the main thread.
In extreme cases you can use computeWithDatabase
to perform ALL the work on another thread
See here
The one drawback to this is that watch
ed queries won't be triggered.
The solution for this is to enable shareAcrossIsolates
in your driftDatabase
e.g.
@DriftDatabase(tables: [TodoItems, TodoCategory])
class AppDatabase extends _$AppDatabase {
AppDatabase() : super(driftDatabase(name: 'my_database',native:DriftNativeOptions(shareAcrossIsolates: true)));
@override
int get schemaVersion => 1;
}
Can you post your database class?
@dickermoshe Thank you for the clarification. I can utilize isolation to run all operations on another thread. However, my other data stream listening to SQL will not be updated in the UI instantly. Does the computeWithDataBase
function ensure that the UI can listen for SQL changes immediately by simply allow shareAcrossIsolates
?
my data is a little bit complicated, my chat app mainly listen the data comes from server webscoket, and I need:
the message would UI free when data too much, and when the listen involves many topics write at the same time, the operation would junky.
for instance, my message_dao.dart have get messages operation like this:
Stream<List<DbMessageWithDbContactOrRoomMember>>
getMessagesWithContactByRoomId2(String roomId,
{int perPage = 25, int pageNum = 0}) {
final outerMessages = alias(messages, 'm');
final index = alias(messageIndex, 'idx');
final query = select(outerMessages).join([
leftOuterJoin(
index,
index.originalId.equalsExp(outerMessages.id) |
index.messageId.equalsExp(outerMessages.id)),
leftOuterJoin(contacts, contacts.id.equalsExp(outerMessages.fromId))
])
..limit((pageNum + 1) * perPage, offset: 0)
..where(outerMessages.roomId.equals(roomId))
..orderBy([
OrderingTerm(
expression: outerMessages.sendTime, mode: OrderingMode.desc)
]);
return query.watch().map((rows) {
final messageById = <String, DbMessage>{};
final childIdsByOriginalId = <String, List<String>>{};
for (final row in rows) {
final id = row.read(outerMessages.id)!;
final dbMessage = row.readTable(outerMessages);
messageById[id] = dbMessage;
final originalId = row.read(index.originalId);
final messageId = row.read(index.messageId);
if (originalId != null) {
final childIds = childIdsByOriginalId[originalId] ?? [];
if (messageId != null && !childIds.contains(messageId)) {
childIds.add(messageId);
}
childIdsByOriginalId[originalId] = childIds;
}
}
// debugPrint("----------------->>>>>>> $childIdsByOriginalId");
return rows.map((row) {
final id = row.read(outerMessages.id)!;
final dbMessage = messageById[id]!;
final originalId = row.read(index.originalId);
final original = originalId != null ? messageById[originalId] : null;
final childIds = childIdsByOriginalId[id] ?? [];
// debugPrint("----------------->>> $originalId $original $childIds");
final dbMessageExtra = DbMessageExtra.fromDbMessage(
dbMessage,
originalityMessage: original != null
? DbMessageExtra.fromDbMessage(original,
repliedMessages: null, originalityMessage: null)
: null,
repliedMessages: [
for (final childId in childIds)
if (messageById.containsKey(childId) && childId != id)
DbMessageExtra.fromDbMessage(messageById[childId]!,
repliedMessages: null, originalityMessage: null)
],
);
return DbMessageWithDbContactOrRoomMember(
dbMessageExtra,
contact: row.readTableOrNull(contacts),
);
}).toList();
});
}
this will uses a stream, which listen in my chat UI, every time I entered chat screen, it will load latest hisotry chats, and when new data comes, the UI will update automatically.
Now, when every time into chat Screen, if there are unread messages( I will set to read which caused a write sql), and query chat history same time, would get UI janky.
Any suggestions and solutions for it?
my database class looks like:
@DriftDatabase(tables: [
Contacts,
Users,
Messages,
Invitations,
UsersUranus,
RoomMember,
MessageIndex,
BoardData,
CommentData,
Reading,
NormalComment,
], daos: [
UserDao,
ContactDao,
MessageDao,
InvitationDao,
UserUranusDao,
RoomMemberDao,
MessageIndexDao,
BoardDataDao,
ReadingDao,
])
class MyDatabase extends _$MyDatabase {
static MyDatabase? _instance;
static MyDatabase? instance() {
_instance ??= MyDatabase._();
return _instance;
}
// we tell the database where to store the data with this constructor
MyDatabase._() : super(_openConnection());
// you should bump this number whenever you change or add a table definition. Migrations
// are covered later in this readme.
@override
int get schemaVersion => 20;
}
Also, want ask, if using Isolate.run
to do sql insert, will that shareIsolates works? such as:
Isolate.run(() {
DbMessage dbMessage;
dbMessage = event.toDbMessage();
kLog("[appData: toDb] [rssNews] ${dbMessage.toJson()}");
// Dont' touch messages, just save to Reading db
var r = DbReadingUtils.fromText(dbMessage.textClm);
MyDatabase.instance()!.readingDao.addSingle(r);
});
@lucasjinreal
can you show me the _openConnection()
function?
@lucasjinreal
You cannot pass a MyDatabase
instance between isolates.
You can only pass the underlying connection.
In your case you need to do the following:
import 'dart:async';
import 'package:drift/drift.dart';
import 'package:drift/isolate.dart';
@DriftDatabase(tables: [...], daos: [...])
class MyDatabase extends _$MyDatabase {
static MyDatabase? _instance;
static MyDatabase? instance() {
_instance ??= MyDatabase._();
return _instance;
}
// We can create the database with an existing connection if we want
MyDatabase._(QueryExecutor? e) : super(e ?? _openConnection());
// If you want to start a isolate with access to your database your cant use Isolate.run
// you have to use this
Future<T> withIsolate<T>(FutureOr<T> Function(AppDatabase) computation){
return computeWithDatabase(computation: computation, connect: (e) => AppDatabase._(e));
}
}
// If you would like to have queries synced between Isolates you must use `driftDatabase` from `drift_flutter`
// with shareAcrossIsolates: true
QueryExecutor _openConnection(){
return driftDatabase(name: 'my_database',native:DriftNativeOptions(shareAcrossIsolates: true));
}
void main(){
MyDatabase.instance().withIsolate((db){
db.... /// You are in a isolate!
});
}
It's usually simpler than this, but you're using a private constructor so it's a drop more complex
Also, want ask, if using
Isolate.run
to do sql insert, will that shareIsolates works? such as:Isolate.run(() { DbMessage dbMessage; dbMessage = event.toDbMessage(); kLog("[appData: toDb] [rssNews] ${dbMessage.toJson()}"); // Dont' touch messages, just save to Reading db var r = DbReadingUtils.fromText(dbMessage.textClm); MyDatabase.instance()!.readingDao.addSingle(r); });
No
The one drawback to this is that
watch
ed queries won't be triggered.
computeWithDatabase
should actually invalidate stream queries, this might be a bug.
You can't use Isolate.run
with a closure capturing a drift database because the drift database itself cannot be sent across isolates. Drift internally provides APIs allowing the underlying connection to be shared across isolates though - that is where computeWithDatabase
comes in.
What's important to understand is where the slowdown is actually coming from. It could be:
Attaching the profiler to your application and monitoring where CPU time is being spent in the main isolate should reveal the problem. Based on the symptoms you've described, I suspect it's the second issue.
A solution for the first issue can be to use multiple database workers as readers with WAL mode, for instance by enabling readPool
in NativeDatabase.createInBackground
.
The second issue is harder to fix, solutions include:
computeWithDatabase
to reduce work on the main isolate.limit
and pagination if you don't need all rows at once. This is especially true when you have a stream that updates frequently.@simolus3 thank u for your always support.
Yes. I am actually should on the 2rd issue. And I think I didn't uses batch all the time. For instance, when entering the chat screen, the message displayed flag for read I actually update it one by one. Does there any examples could be reference for batch updating since am using customized data structure.
Another could caused is selecting operation is time consuming, but am ready used limit, a very limited rows were selected each time. I still think the operation on qeury and the same time inserting data, caused some sort of waiting lock in sql, could that be a case? how to resolve it , this is why I ask could I let inserting data be isolated.
@dickermoshe May I ask, how could I modify my data dao to support computeWithDatabase? Need I change all Dao operation except the database class itself?
Another could caused is selecting operation is time consuming, but am ready used limit, a very limited rows were selected each time. I still think the operation on qeury and the same time inserting data, caused some sort of waiting lock in sql, could that be a case? how to resolve it , this is why I ask could I let inserting data be isolated.
This wouldn't lock up the main thread. If your UI is janky, the this is definitely not the issue.
Yes. I am actually should on the 2rd issue. And I think I didn't uses batch all the time. For instance, when entering the chat screen, the message displayed flag for read I actually update it one by one. Does there any examples could be reference for batch updating since am using customized data structure.
After each write operation (UPDATE/DELETE,CREATE), events are sent to every watch operation which was potentially affected by it. So if you you find yourself doing something like this:
// Use a streamed query
db.select(userTable).watch().listen((){
print("Something Changed!)
})
// Update something 1 at a time
for (final i in users){
db.update(...)
}
The it would make sense that the main Isolate is getting slowed down. Te UI is rebuilding 100s of times a second!
The solution is to wrap operation in a transaction, that way events are only sent out at the end:
await transaction((){
for (final i in users){
await db.update(...)
}
})
Now only one event is sent.
@dickermoshe May I ask, how could I modify my data dao to support computeWithDatabase? Need I change all Dao operation except the database class itself?
If you have a operation creating or retrieving lots of ways, just use db.computeWithDatabase
in your dao
class SomeDao extends DatabaseAccessor<TodoDb> with _$SomeDaoMixin {
SomeDao(super.db);
foo()async{
await db.withIsolate( (db) {
// Do work here
});
}
}
@lucasjinreal Have you tried using the flutter dev tools to see which function is locking up the main thread. If you got lots of stuff going on in the background, it could be very hard to find what it is by poking around blindly
@dickermoshe thank u sir.
I debug several years, found the janky root reason, might caused by this messags query function:
Stream<List<DbMessageWithDbContactOrRoomMember>>
getMessagesWithRoomMemberByRoomId2(String roomId,
{int perPage = 25, int pageNum = 0}) {
final outerMessages = alias(messages, 'm');
final index = alias(messageIndex, 'idx');
final query = select(outerMessages).join([
leftOuterJoin(
index,
index.originalId.equalsExp(outerMessages.id) |
index.messageId.equalsExp(outerMessages.id)),
leftOuterJoin(
roomMember,
roomMember.id.equalsExp(outerMessages.fromId) &
roomMember.roomId.equalsExp(outerMessages.roomId))
])
..limit((pageNum + 1) * perPage, offset: 0)
..where(outerMessages.roomId.equals(roomId))
..orderBy([
OrderingTerm(
expression: outerMessages.sendTime, mode: OrderingMode.desc)
]);
try {
return query.watch().map((rows) {
final messageById = <String, DbMessage>{};
final childIdsByOriginalId = <String, List<String>>{};
for (final row in rows) {
final id = row.read(outerMessages.id)!;
final dbMessage = row.readTable(outerMessages);
messageById[id] = dbMessage;
final originalId = row.read(index.originalId);
final messageId = row.read(index.messageId);
if (originalId != null) {
final childIds = childIdsByOriginalId[originalId] ?? [];
if (messageId != null && !childIds.contains(messageId)) {
childIds.add(messageId);
}
childIdsByOriginalId[originalId] = childIds;
}
}
// debugPrint("----------------->>>>>>> $childIdsByOriginalId");
return rows.map((row) {
final id = row.read(outerMessages.id)!;
final dbMessage = messageById[id]!;
final originalId = row.read(index.originalId);
final original = originalId != null ? messageById[originalId] : null;
final childIds = childIdsByOriginalId[id] ?? [];
// debugPrint("----------------->>> $originalId $original $childIds");
final dbMessageExtra = DbMessageExtra.fromDbMessage(
dbMessage,
originalityMessage: original != null
? DbMessageExtra.fromDbMessage(original,
repliedMessages: null, originalityMessage: null)
: null,
repliedMessages: [
for (final childId in childIds)
if (messageById.containsKey(childId) && childId != id)
DbMessageExtra.fromDbMessage(messageById[childId]!,
repliedMessages: null, originalityMessage: null)
],
);
return DbMessageWithDbContactOrRoomMember(
dbMessageExtra,
roomMember: row.readTableOrNull(roomMember),
);
}).toList();
});
} catch (e) {
debugPrint("Error occurred: $e");
// Handle the error as needed, e.g., return an empty stream or rethrow
return const Stream.empty();
}
}
Can u teach me what's the actually reason why this function is janky? And what possible the way to make it more faster?
How often is this getting triggered? do this:
final stream = getMessagesWithRoomMemberByRoomId2(...)
stream.listen((_){
print(DateTime.now().millisecondsSinceEpoch);
});
// Use the stream
How often are you seeing a read?
this is only run when entering the screen, and when new messages comes, the stream would update.
I casually got janky when entering screen, don't know if the message num getting big, the process got stuck
Yes, but either this stream is triggered by other opperations too often, or the stream is being recreated on every build
please try this and report back
How often is this getting triggered? do this:
final stream = getMessagesWithRoomMemberByRoomId2(...) stream.listen((_){ print(DateTime.now().millisecondsSinceEpoch); }); // Use the stream
How often are you seeing a read?
@dickermoshe hi, i logged the stream, it prints expected every time messages updates (sending or erceiveing)
is that normal
bytw, here is my openconnect previous:
LazyDatabase _openConnection() { // the LazyDatabase util lets us find the right location for the file async. return LazyDatabase(() async { final dbFolder = await getApplicationDocumentsDirectory(); final file = File(p.join(dbFolder.path, 'chat2.sqlite')); if (!await file.exists()) { debugPrint('!!!!!!!!!!!!! sql not exist! create it.'); dbFolder.create(); } // return NativeDatabase(file); return NativeDatabase.createInBackground( file, setup: (rawDb) { rawDb.execute('PRAGMA journal_mode=WAL;'); }, ); }); }
how can i use corssIsolate.
Using createInBackground
should be enough to run queries in the background isolate. How many rows are you listening on? Are you using a StreamBuilder
to listen on the stream in the widget (if not, a common mistake is to forget listening on the stream - if they pile up the queries would run unnecessarily often)?
The query doesn't appear to be expensive. Does adding watch().distinct(const ListEquality()).map(...
help if the mapper is too expensive?
@simolus3 I using createInBackground, and withIsolate to insert, the UI didn't update.
may ask what's the distinct superior compares to normal way
distinct
would makes the stream not update if nothing has actually changed.
hi, i logged the stream, it prints expected every time messages updates (sending or erceiveing)
How often is that? We are looking for something which is running too often.
Use the flutter devtools to find what is causing this
There is an application that may have multiple locations where SQL is written simultaneously. However, this has caused the user interface to become stuck every time. I am aware that Flutter operates on a single thread. Consequently, every SQL write operation is performed within an await future.
I am curious about the following:
Background: My application is a chat app that listens to a websocket for message updates. This occurs when the app is opened or resumed. New messages may arrive, and each time new data arrives, the UI becomes unresponsive (or very laggy). (The messages have two types, one is normal chat, and the other is news. They are listened to simultaneously, so they could be written at a very close time.)
However, currently, I am experiencing this unresponsive behavior every single time.
I have already opened the database with isolate.