Open KotoriMinami opened 3 months ago
Thanks for evaluating that, @KotoriMinami. Do you have any solution in mind?
@diegolmello Maybe this is possible?
consumer = async () => {
// copy variables values to local and clean them
const _lastOpen = this.lastOpen;
const _queue = Object.keys(this.queue).map(key => this.queue[key]);
this._messagesBatch = this.messagesBatch;
this._threadsBatch = this.threadsBatch;
this._threadMessagesBatch = this.threadMessagesBatch;
this.queue = {};
this.messagesBatch = {};
this.threadsBatch = {};
this.threadMessagesBatch = {};
for (let i = 0; i < _queue.length; i += 1) {
try {
// eslint-disable-next-line no-await-in-loop
await this.updateMessage(_queue[i]);
} catch (e) {
log(e);
}
}
try {
const db = database.active;
await db.write(async () => {
await db.batch(
...Object.values(this._messagesBatch),
...Object.values(this._threadsBatch),
...Object.values(this._threadMessagesBatch)
);
});
this.read(_lastOpen);
} catch (e) {
log(e);
}
// Clean local variables
this._messagesBatch = {};
this._threadsBatch = {};
this._threadMessagesBatch = {};
this.timer = null;
if (Object.keys(this.queue).length) {
this.timer = setTimeout(this.consumer, WINDOW_TIME);
}
}
handleMessageReceived = (ddpMessage: IDDPMessage) => {
if (!this.timer) {
this.timer = setTimeout(this.consumer, WINDOW_TIME);
}
this.lastOpen = new Date();
const message = buildMessage(EJSON.fromJSONValue(ddpMessage.fields.args[0])) as IMessage;
this.queue[message._id] = message;
};
Additionally, I'm not quite sure about the role of this.messagesBatch
in the logic.
@KotoriMinami Thanks. That's promising!
This code is a little bit harder to read than it should, because of a performance improvement to make messages to be saved in batches of 1 second. We can reevaluate if it's really needed though, since it was implemented a long time ago.
Without it, every message coming from websocket would be saved on WatermelonDB directly.
If there was too many messages coming from websocket at the same time, it would lead to even more Diagnostic error
you reported.
Can you remove everything related to the queue and give it a try?
I'm pretty sure you could just call updateMessage
on handleMessageReceived
.
@diegolmello Yes, I think using a queue to handle this logic is reasonable.
Once you use the prepare
related APIs, any subsequent updates before executing db.batch
are invalid.
Therefore, it's best to ensure that the next prepare
runs after the batch is completed, otherwise, the latest updates might be lost.
I'm confident that using a queue will be much more effective than directly calling updateMessage within handleMessageReceived. I only had this question because I noticed that this.messagesBatch
was continuously assigned as {},
and then this._messagesBatch = this.messagesBatch
was executed.
It feels like it's not really being utilized in the process.
You can try simulating a time consuming operation before db.write
, for example:
for (let i = 0; i < _queue.length; i += 1) {
try {
// eslint-disable-next-line no-await-in-loop
await this.updateMessage(_queue[i]);
} catch (e) {
log(e);
}
}
await new Promise((resolve, reject) => {
setTimeout(resolve, 4000);
});
try {
const db = database.active;
await db.write(async () => {
await db.batch(
...Object.values(this._messagesBatch),
...Object.values(this._threadsBatch),
...Object.values(this._threadMessagesBatch)
);
});
this.read(_lastOpen);
} catch (e) {
log(e);
}
If a new websocket push is received during the waiting period,
you can see that this._messagesBatch
actually gets lost because it’s overwritten by the next timer.
This results in the messages from the previous queue not receiving any further updates.
Of course, this simulated scenario is quite extreme, and the likelihood of it happening in reality is extremely low.
After a simple test, the above code is able to handle this situation without losing the latest updates. However, I'm not sure about its robustness.
After all, the current code logic has been running for four years.
Description:
Some messages within the conversation encounter the following issues.
Environment Information:
Steps to reproduce:
Expected behavior:
Within the conversation, messages can be updated and further actions can be performed.
Actual behavior:
There's a probability that a certain batch of messages may not update correctly.
Additional context:
After examining the logs, it is suspected that the issue is caused by the following:
In the https://github.com/RocketChat/Rocket.Chat.ReactNative/blob/bb59d980b23484a7897611de77d6847bcfad77f4/app/lib/methods/subscriptions/room.ts#L314 method that handles message updates over a certain period of time. All creations and updates are stored in https://github.com/RocketChat/Rocket.Chat.ReactNative/blob/bb59d980b23484a7897611de77d6847bcfad77f4/app/lib/methods/subscriptions/room.ts#L244
If there is a slightly long-running task when executing https://github.com/RocketChat/Rocket.Chat.ReactNative/blob/bb59d980b23484a7897611de77d6847bcfad77f4/app/lib/methods/subscriptions/room.ts#L332
before reaching
db.batch
, https://github.com/RocketChat/Rocket.Chat.ReactNative/blob/bb59d980b23484a7897611de77d6847bcfad77f4/app/lib/methods/subscriptions/room.ts#L341 because https://github.com/RocketChat/Rocket.Chat.ReactNative/blob/bb59d980b23484a7897611de77d6847bcfad77f4/app/lib/methods/subscriptions/room.ts#L327 the next timer execution may be triggered.At this point, https://github.com/RocketChat/Rocket.Chat.ReactNative/blob/bb59d980b23484a7897611de77d6847bcfad77f4/app/lib/methods/subscriptions/room.ts#L320 the creation and updates from the previous batch will be lost, causing them to not reach the
db.batch
method.As a result, Model._preparedState will remain in an executing state in memory and will never recover. Consequently, this batch of messages will not receive any updates.