I believe when the re-run occurs, it is reading the current state and obtaining the record_last_received_datetime from the state dictionary. I believe the code is currently point to the the self.replication_key rather than the self.replication_key_value which contains the timestamp.
To prove the scenario I wrote a simple python program to parse the JSON from the current state which was saved from the initial ingestion.
import json
import pendulum
from typing import cast
import datetime
my_state = '{"completed": {"singer_state": {"bookmarks": {"connections": {"partitions": [{"context": {"account_id": "5"}}]}, "environments": {"partitions": [{"context": {"account_id": "5"}}]}, "jobs": {"partitions": [{"context": {"account_id": "5"}}]}, "projects": {"partitions": [{"context": {"account_id": "5"}}]}, "repositories": {"partitions": [{"context": {"account_id": "5"}}]}, "runs": {"partitions": [{"context": {"account_id": "5"}, "replication_key": "finished_at", "replication_key_value": "2024-09-09 11:01:05.436229+00:00"}]}, "users": {"partitions": [{"context": {"account_id": "5"}}]}, "accounts": {}}}}, "partial": {}}'
my_state_dict = json.loads(my_state)
replication_key_value = my_state_dict['completed']['singer_state']['bookmarks']['runs']['partitions'][0]['replication_key_value']
replication_key = my_state_dict['completed']['singer_state']['bookmarks']['runs']['partitions'][0]['replication_key']
print(f'Last replication_key_value as a string = {replication_key_value}')
print(f'Last replication_key as a string = {replication_key}')
# Use pendulum for replication_key_value timestamp conversion
pendulum_last_received_datetime: pendulum.DateTime = cast(pendulum.DateTime, pendulum.parse(replication_key_value))
print(f'Pendulum = {pendulum_last_received_datetime}')
# Use datetime for replication_key_value timestamp conversion
new_last_received_datetime = datetime.datetime.fromisoformat(replication_key_value)
print(f'Datetime = {new_last_received_datetime}')
# Use Monkey Patch backport for replication_key_value timestamp conversion
if 1==1:
from backports.datetime_fromisoformat import MonkeyPatch
MonkeyPatch.patch_fromisoformat()
monkeypatch_last_received_datetime = datetime.datetime.fromisoformat(replication_key_value)
print(f'Monkey Patch Datetime = {monkeypatch_last_received_datetime}')
# Use replication_key for timestamp conversion - this should break
incorrect_datetime_key = datetime.datetime.fromisoformat(replication_key)
The result emulate the issue.
(venv) test_tap_gitlab]$ python test_gitlab_replication.py
Last replication_key_value as a string = 2024-09-09 11:01:05.436229+00:00
Last replication_key as a string = finished_at
Pendulum = 2024-09-09 11:01:05.436229+00:00
Datetime = 2024-09-09 11:01:05.436229+00:00
Monkey Patch Datetime = 2024-09-09 11:01:05.436229+00:00
Traceback (most recent call last):
File "/home/me/test_tap_gitlab/test_gitlab_replication.py", line 35, in <module>
incorrect_datetime_key = datetime.datetime.fromisoformat(replication_key)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: Invalid isoformat string: 'finished_at'
Incremental replication from tap-dbt is failing with the following error on line 120 in the streams.py
I believe when the re-run occurs, it is reading the current state and obtaining the
record_last_received_datetime
from the state dictionary. I believe the code is currently point to the theself.replication_key
rather than theself.replication_key_value
which contains the timestamp.To prove the scenario I wrote a simple python program to parse the JSON from the current state which was saved from the initial ingestion.
The result emulate the issue.