Closed dwreeves closed 1 year ago
Thank you @dwreeves for providing detailed requirements. Appreciate it. Would you like to create a PR for this proposal?
@phanikumv Yeah, I can get to it in the next couple days.
Hold on a second... 🤔❗
It looks like the xcom
parameter sort of does the thing I want it to do.... Only sort of.
First of all, the name xcom
is highly misleading. The timestamp doesn't necessarily need to come from xcom
; it just comes from Jinja templating. For example, {{ data_interval_end }}
would be a perfectly valid input that doesn't use xcom
. I would propose renaming this to target_time
(in line with the convention of DateTimeSensor
) or target_completed_time
.
Second, the behavior and feature is not very well documented, and could use a description or example in the README
.
Still, there are differences between what I am proposing and what currently exists. With FivetranSensor(..., xcom="{{ data_interval_end }}")
, it will always fail so long as there is any failure after "{{ data_interval_end }}"
, even if the latest sync was a success.
For example, imagine:
@daily
schedule starting January 1stIn this case, the DAG will look like this:
Su | Mo | Tu | We | Th | Fr | Sa |
---|---|---|---|---|---|---|
❌ | ❌ | ❌ | ❌ | ❌ | ❌ | ❌ |
❌ | ❌ | ❌ | ✅ | ✅ | ⌛ | ⌛ |
⌛ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ |
⬜ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ |
⬜ | ⬜ | ⬜ |
Whereas the behavior we would like is:
Su | Mo | Tu | We | Th | Fr | Sa |
---|---|---|---|---|---|---|
✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
✅ | ✅ | ✅ | ✅ | ✅ | ⌛ | ⌛ |
⌛ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ |
⬜ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ | ⬜ |
⬜ | ⬜ | ⬜ |
Obviously this is a very different behavior than what we would want when using xcom="{{ data_interval_end }}"
. And in my opinion, it is not sensible behavior-- at least in the context of using a "target time" in the sense of how DateTimeSensor
works.
The reason for this is the order of operations is:
failed_at > previous_completed_at
, then fail.sync_state == "rescheduled" and connector_details["schedule_type"] == "manual"
, then wait.max([failed_at, succeeded_at]) > previous_completed_at
, then pass.Note that because max([failed_at, succeeded_at]) > target_completed_time
is run after failed_at > target_completed_time
is already shown to be not true, then the third condition can be reduced to simply checking succeeded_at > target_completed_time
. Also, I am renaming previous_completed_at
to target_completed_time
. So I restate the existing control flow equivalent as follows:
Existing flow:
failed_at > target_completed_time
, then fail.sync_state == "rescheduled" and schedule_type == "manual"
, then restart and wait.succeeded_at > target_completed_time
, then pass.And so the failures for all the previous dates occur because latest states being in failure always take precedence over any successes. Essentially, the existing control flow essentially treats failed_at > target_completed_time > succeeded_at
and failed_at > target_completed_time > succeeded_at
the same way and fails both.
Whereas the order of operations I am proposing is:
Proposed flow:
There is no way to emulate the exact behavior to my proposal in the original post, unless I add another kwarg, something like "always_fail_if_latest_status_is_failed
". This gives us the following flow:
always_wait_when_syncing and sync_state == "syncing"
, then wait.failed_at > target_completed_time and always_fail_if_latest_status_is_failed
, then fail.succeeded_at > target_completed_time
, then pass.sync_state == "rescheduled" and schedule_type == "manual"
, then restart and wait.failed_at > target_completed_time > succeeded_at
, then fail.failed_at > succeeded_at > target_completed_time and propagate_failures_forward
, then fail.Note that if you do the following:
always_fail_if_latest_status_is_failed=True
always_wait_when_syncing=False
Then this is almost perfectly a refactor, with the exception that the "restart and wait" step is flipped with the "if succeeded_at > target_completed_time
then pass" step.
In practice I am not convinced it is a big deal to flip these for the existing FivetranSensor
. If you just use the FivetranSensor
completely unadorned, then the target_completed_time
is equal to the previous last sync time, and if the sensor is ever "rescheduled" between the last run and next run, then the next run has to have not happened.
However, flipping these does matter for the behavior I am proposing because if the connector is rescheduled but has still succeeded historically, the user should be able to pass the backfill up to the latest success.
I am bringing all of this up because it begs the question of what should be done. Essentially my proposal right now could be implemented into FivetranSensor
by adding 3 additional kwargs to it (always_wait_when_syncing
, always_fail_if_latest_status_is_failed
, and propagate_failures_forward
). But that seems very complex. I am going to sit on this a bit and get back.
In any event, I would recommend renaming the xcom
variable to target_completed_time
, and parse it the same way that DateTimeSensor
parses datetimes. (Meaning, allow for not only str
inputs but also datetime
inputs.) This would make it clearer to users that the value doesn't technically need to come from xcom
to be a valid input, and would make the API of the FivetranSensor
more general, and more similar to other Airflow APIs.
Renaming xcom
is still not sufficient on its own, I just think it is a start as I think through the full API...
Alright, I thought about it a bit... you can mostly ignore my word vomit above.
I think the biggest question I have is: Is there a downside with the current design to swapping from (essentially) this...
if failed_at > target_completed_time:
raise AirflowException
elif succeeded_at > target_completed_time:
return True
else:
return False
... to (essentially) this?
if succeeded_at > target_completed_time:
return True
elif failed_at > target_completed_time:
raise AirflowException
else:
return False
I think for 99% of users, the answer should be essentially that there is no difference. The reason why is because if the timestamp is being set based on the last completed time, then it is unlikely that you ever run into a situation where there is both a succeeded_at
and failed_at
which are greater than the target_completed_time
.
So, it should be safe to switch this logic. And this change in the logic would allow for using the xcom
kwarg (rename pending...) to set it to {{ data_interval_end }}
and have that pass all backfill DAG runs up to the latest success date.
So overall, I think this change in logic would be extremely beneficial, and would allow the FivetranSensor
to work for backfill DAG runs with minimal extra complexity added, and it should not have any real impact on existing users. (This repo only has 12 stars for now, anyway, and probably isn't mass adopted yet).
What do you think? I will be putting this logic into a PR for further consideration.
Overview of existing
FivetranSensor
behaviorThe FivetranSensor does the following:
I feel that this control flow is not what a majority of users will find optimal for a couple reasons.
First, any user triggering Fivetran via the
FivetranOperator
should probably prefer to use anExternalSensorOperator
over theFivetranSensor
. This means theFivetranSensor
is at its best when waiting on jobs scheduled by Fivetran automatically, rather than jobs scheduled by Airflow (and thus scheduled in Fivetran "manually").For that reason, all examples and issues described below assume the user's system is that they are using the Sensor to wait on Fivetran jobs scheduled by Fivetran rather than jobs scheduled by Airflow. Many of the issues I will be describing still apply when scheduling Fivetran jobs via Airflow (and see my Implementation proposal section where I discuss a feature that would make sense to be added to
FivetranOperator
), but these issues are more pronounced and easier to understand when thinking about the Fivetran scheduler interoperating with the Airflow scheduler.The main issue: backfilling (with 2 examples)
The main issue is that when waiting on Fivetran scheduled jobs, a DAG that is being backfilled does not necessarily need to wait for a new Fivetran job.
Scenario A (Last fail > Last success)
Imagine the following situation (Scenario A):
@daily
schedule starting January 1stThe user wants the following behavior to occur:
depends_on_past=True
, but there is a subtle difference between that and this.)This will allow the user to backfill data up to January 10th, at which point the DAG will fail:
The user in this case implements the
FivetranSensor
, and instead the way their DAG works is that it waits for the next Fivetran job to trigger, even though the backfilled jobs don't need to wait around for anything.For this small backfill and with Fivetran jobs that aren't failing, this isn't a huge deal, but when backfilling years of data in daily chunks, this can unnecessarily slow down a backfill a lot. Imagine you sync your Fivetran data once a day, and your Airflow DAG's max active DagRuns is 10. In this case, implementing a FivetranSensor is bottlenecking your backfill by 10 DagRuns a day, and the only way around it is to implement complex control flow logic (e.g. BranchDateTimeOperator), or to implement a custom implementation, or to implement the backfill as a separate DAG.
Scenario B (Last success > Last time)
Scenario A was designed to see the full range of behaviors, but a far more typical scenario is that a last success occurred much more recently in the future than a last failure.
Imagine the following situation (Scenario B):
@daily
schedule starting January 1stThe user wants the following behavior to occur:
The desired behavior when last fail time > last success time was up to interpretation, but here it is a little more straightforward that we should just be waiting.
Other issues
Fault tolerance
The
FivetranSensor
is not fault tolerant in the sense that, if the FivetranSensor is restarted, the new instance of the sensor may end up waiting on a different datetime than the previous instance.For example, imagine a user has some sort of error that causes the task to fail (doesn't need to even come from the sensor itself; it could be e.g. an OOM error on the worker running the job). If a Fivetran jobs completes between the time the job failed and was resumed, the sensor will now be waiting on a different Fivetran job to complete.
Race condition (into cascading failures)
Imagine a user syncs the Fivetran job once every hour on the hour, and their Airflow DAG also syncs once every hour on the hour. One of the tasks within the DAG is a
FivetranSensor
.Imagine the Fivetran job typically takes 55 seconds after the hour to run. This means that if the
FivetranSensor
is not executed by00:00:55
, then the sensor will end up waiting a whole hour to run, i.e. it completes at around01:00:55
.It gets worse from here. Imagine the whole DAG is configured with
default_args={"depends_on_past": True}
. TheFivetranSensor
with execution date00:00:00
that got stuck for an hour will end up blocking the next Fivetran sensor task from getting scheduled, meaning that theFivetranSensor
with execution date01:00:00
won't finish until02:00:55
. This is a cascading failure!Implementation proposal
FivetranDateTimeSensor
The core functionality I am proposing is a
FivetranDateTimeSensor
.FivetranSensor
.DateTimeSensor
works, which is that it waits for a time to pass but passes immediately on backfills.The majority of the functionality can just become a new method in the hook.
Additional control flow kwargs:
target_time: datetime | str = "{{ data_interval_end }}"
. (Note: this is a templated field). This kwarg name comes directly fromDateTimeSensor
, albeit here it is optional. This is the timestamp that the Fivetran completed time is compared to.propagate_failures_forward: bool = True
- The behavior of this should be: when this flag is True, it makes it so the sensor fails whencontext["data_interval_end"] > fivetran_failed_at > fivetran_succeeded_at
. If the flag is False, then instead it will wait around until there is a completed at time.always_wait_when_syncing: bool = False
- Fivetran syncs in chunks and this can cause issues when reading from a database currently being synced in some situations. Imagine for example a transform job that "increments" usingselect max(write_time) from tbl
. Fivetran writes in chunks not necessarily ordered bywrite_time
, meaning doing stuff while Fivetran is syncing can cause you to skip data. (This is not the best example because you probably wouldn't have a backfill job touching the most recentwrite_time
data, but this can still happen in other contexts).I see it as uncommon in most Airflow provider packages to create large inheritance trees, so it is reasonable to simply implement all of this as a subclass of BaseSensor and just allow the
FivetranHook
abstraction to do most of the heavy lifting.Additional kwarg for
FivetranOperator
On the topic of supporting backfills in a sensible manner, the
FivetranOperator
should also have a kwarg that skips running the Fivetran job whencontext["data_interval_end"] > fivetran_succeeded_at
.I'm not sure what a good name for this kwarg may be.
skip_if_succeeded_after_data_interval_end
is a little on the verbose side, but is an accurate description. I'd love to hear if anyone has ideas for a snappier name though.For backwards compatibility, it should be introduced as having a default of
False
, albeit I do believe this would be a good default in a future release.Other notes
I already have a version of this implemented in a production system, and am willing to implement a version of this functionality in this open source library.
Regardless of what happens, the documentation for the existing
FivetranSensor
should be more clear about what is happening.EDIT: I realize I should be using the
data_interval_end
and not thelogical_date
. Logical date would be sensible if we knew when Fivetran last syncs start, but we do not have that info available; we only know when a sync ends.