Closed chime3 closed 1 month ago
No, this is incorrect.
Items range tracker is storing placeholder strings that could be anything. They could be page tokens, timestamps or record IDs. Items range tracker doesn't need to be changed as it's flexible and can handle all scenarios.
In the Gmail handler we use a mixture of page tokens and record IDs.
The Telegram handler uses record IDs (which is my preference as it's exact).
I have added feedback on the Google Calendar handler, as it should also be using page tokens and record IDs instead of timestamps.
API has [10,9,8,7,6,5,4,3,2,1] (where 1 = oldest, 10 = newest) Batch size = 5
Batch 1; Processing newest records [10,9,8,7,6], completed ranges = [10,6] Let's assume 24 hours between the next batch and 20 more records created API has [30,29,28,27,26,25,24,23,22,21,20,19,18,17,16,15,14,13,12,11,10,9,8,7,6,5,4,3,2,1] Batch 2; Processing newest records[30,29,28,27,26], completed ranges = [30,26] [10,6]
When the time between batch processing results in more items being generated than the batch size, we end up with gaps between completed ranges. This is why we need the items range tracker.
We can't assume all items can be processed at once. For example; processing email may have 1,000's of records and we can't necessarily lock up a server to fetch them all at once. We also have to assume a server could go down, an API could go down or other issues resulting in batches not being fully completed. We need the items tracker to track our progress to make a robust solution that can handle those failures.
We want to prioritize the most recently changed items first
We should avoid timestamps where possible, because you can have records with the timestamp
Some API's offer pageTokens
, which guarantee the next set of page results are sequential from the previous page, which is perfect for syncing data and not missing any records, so ideally we can use pageTokens
We need to support syncing data in batches, which means we end up with gaps between the batches of fetched data which need to be managed
Completed ranges = records that have already been processed
We need to allow for the fact that updates may be more complete
Assume we have completed item ranges: [0-10] [15-20] [25-50] Assume we have completed item ranges: [100 - 90] [ 80-70 ] [ 60-40 ] Assume we have completed item ranges: [100 - zAFDFIadf] [ 80- b1cFDFIadf ] [ 60 - cAddFIadf ]
Call nextRange()
gives us a start and end ID of the next range that needs to be filled, so it returns:
[11 - 15], where startId=11
and endId=15
.
Then, the handler code should iterate through all results that start at 11
and when 15
is hit, it must break.
So, if the numbers above are recordIds, we would effectively loop from 11-15 and when we hit recordId = 15, the loop should end.
Let's assume we fetch records [40 - 50], then we wait 10 days, and now there are records 1-50 in the API. However, our batch size is only set to 10.
So when we process the second batch we process the most recent records first, so we process [1 - 10], this then leaves us with completed item ranges:
[1-10] [40-50]
So, we now have a gap between 10-40.
The next time we process a batch of 10, let's assume there's 2 new records, so the database has [-1,0,1-50].
So the next processing batch will result in the following completed rangers:
[-1,..., 17] [40,50]
Issue
Current
ItemRangeTracker
is based on thenextPageToken
fields of response data and thepageToken
parameter of query which represent IDs.Those fields are specific to Google APIs, and can't be populated to other providers.
Suggestion
So I suggest to manage based on timestamp which is very common. Instead of using
completedRanges
object array, we can utilize an object:And, determining next range and back filling will be much more clear.
Feasibility
Google also provides timestamp in queries including Youtube.
const response = await apiClient.conversations.history({ channel: chatGroup.sourceId!, limit: this.config.messagesPerGroupLimit, oldest: range.startId, // from timestamp latest: range.endId, // end });
const fetchedMessages = await channel.messages.fetch({ after: range.startId, // encode timestamp before: range.endId, });