flipp-oss / deimos

Framework to work with Kafka, Avro and ActiveRecord
Other
59 stars 22 forks source link

Fix db_poller fetching so it does not use the saved last_sent_id on i… #156

Open rzmong opened 2 years ago

rzmong commented 2 years ago

…nitial poll and saves the poll time as last_sent

Pull Request Template

Description

The db_poller performs polls at regular intervals by referencing the last_sent_id and last_sent columns stored in the deimos_poll_info table. During a poll the db_poller may process multiple batches of items with a default batch size of 1000.

After the an initial poll_query the db_poller will save the updated_at and id of the last record it processed as the last_sent and last_sent_id respectively. i.e.

Updates at time of initial poll updated_at id
2 100
2 200
2 300
deimos_poll_info table after initial poll last_sent last_sent_id
2 300

On the next poll the last_sent is used to determine time bounds (time_from: will be 2 and time_to: which will be ~time of the second poll) and the last_sent_id is used as a min_id for the next poll_query(time_from:, time_to:, column_name:, min_id:). If these updates have occurred since the initial poll:

Updates at time of second poll updated_at id
3 100
4 200
5 600

The poll_query takes in 300 as the min_id and wouldn't know it should query for updates across all id values, leading to missing updates for ids 100 and 200.

If we can set the min_id to 0 when we're starting a new poll interval (i.e. when the batch_count == 0) then the poll_query can trust the min_id to be valid and we can find updates across all id values.

Now if we do this, since we've stored 2 as the last_sent this means that the poll_query will look receive a time_from of 2 and a time_to of ~time of second poll. The poll_query could potentially pull in records from the previous poll with updated_at=2, now that we query across all id values. To avoid this we can store the time_to as the value of last_sent after we've completed all processing during a poll interval. This should ensure that we don't include already processed records in subsequent polls.

So in this PR I've made changes to set the min_id for the poll_query to 0 when batch_count == 0 and I've also stored the time_to value as the last_sent once all processing during a poll interval has completed.

Type of change

Please delete options that are not relevant.

How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration

Checklist: