Azure / Azure-DataFactory

Other
481 stars 586 forks source link

Change Data Capture tracking deletes from SQL Server does not work with Delta sink #531

Open stephen-bowser opened 1 year ago

stephen-bowser commented 1 year ago

Hi, I am testing the new change data capture feature. It seems that when rows are deleted from my Azure SQL Server source, they are not deleted from the destination delta table, but instead, the row is appended again. I appreciate you may not want to run an actual DELETE operation on the delta table due to the associated overhead, but I would at least expect a boolean soft-deleted flag so we know how to treat this downstream. Hope that makes sense.

edit: to clarify I am using the Change Data Capture resource rather than the mapping data flow.

update: this issue also occurs with a flat file destination

VishnuvardhanGajjala commented 1 year ago

Hi Stephen, currently deletes are supported only when Native CDC option is used. Can you confirm that you are using Native CDC and not watermark Column approach and 1 more point to note is you need to select Key columns in column mapping in-order to make deletes work. While you check on this, can you share the mapperId for cdc or the factory name to debug the issue from our end.

stephen-bowser commented 1 year ago

Thank you @VishnuvardhanGajjala. adding the keys resolved the issue. I can't see this in the documentation anywhere so it may be worth adding this so others do not have the same issue.

I've now discovered though that it is difficult to track the deletes in my downstream applications. Since I am writing a spark streaming application on top of the delta source, I need to enable the delta lake change feed on the delta table that ADF is writing to so that i can keep track of updates and deletes. However, when I enable this on the table, the ADF CDC process fails. I think this might be because the ADF is writing to the table using an old Databricks Runtime, since the docs I have linked to say that once this feature is enabled, 'you can no longer write to the table using Databricks Runtime 8.1 or below.'

Would it be possible to upgrade the way ADF is writing to delta to make it compatible with this feature? Let me know if I should open a separate issue for this.

For reference the mapperid is 03de3c8c736a4f059fd967faf62aedcc

Error message from adf when this feature is enabled:

{
"message": "{\"StatusCode\":\"DFExecutorUserError\",\"Message\":\"Job failed due to reason: at Sink 'SinkdatabasefeeddboPlannedActivity': Delta protocol version is too new for this version of the Databricks Runtime. Please upgrade to a newer release.\",\"Details\":\"org.apache.spark.sql.delta.actions.InvalidProtocolVersionException: Delta protocol version is too new for this version of the Databricks Runtime. Please upgrade to a newer release.\\n\\tat org.apache.spark.sql.delta.DeltaLog.protocolWrite(DeltaLog.scala:294)\\n\\tat org.apache.spark.sql.delta.OptimisticTransactionImpl$class.prepareCommit(OptimisticTransaction.scala:390)\\n\\tat org.apache.spark.sql.delta.OptimisticTransaction.prepareCommit(OptimisticTransaction.scala:80)\\n\\tat org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:287)\\n\\tat org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)\\n\\tat org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)\\n\\tat com.microsoft.spark.telemetry.delta.SynapseLoggingShim$class.recordOperation(SynapseLoggingShim.scala:72)\\n\\tat org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)\\n\"}",
"failureType": "UserError",
"target": "SystemPipeline_03de3c8c736a4f059fd967faf62aedcc"
}
VishnuvardhanGajjala commented 1 year ago

Hi Stephen, yes. Please open a separate issue for this on mapping data flow side. ADF CDC and mapping data flows internally leverage the same execution engine. Our team can clarify on when we'll be able to support writing to Delta tables with 'change data feed' enabled. (It is available from Delta Lake 2.0.0 onwards.)

stephen-bowser commented 1 year ago

hi @VishnuvardhanGajjala . To follow up on this, I would like to clarify the behavior of the CDC for deletes when Keys are not enabled, which is the situation I first described in my ticket. Why is it that when CDC identifies a deleted record, it inserts the record into the table with no sign that it was a delete. This could mislead downstream users of the data since they would have no way of knowing whether a record represents a DELETE or a new record. For this situation, I think the only way to make this clear would be for the ADF CDC resource to flag it with a soft delete flag? Similar to what tools like fivetran do with _fivetran_deleted. Does that make sense?

rohithqfr commented 1 year ago

Hi Stephen,

For the first item. We agree that the behaviour is not consistent when the keys are not selected on the sink. We will take it as a feedback and will fix the same.

For the second item to enable soft deletes and tracking change history in the sink, is an item we have it in our road map for a future date.

stephen-bowser commented 1 year ago

hi @rohithqfr , any update on when the issue with deletes on tables with no keys will be resolved? Ideally we really need flags for the operation type (Insert, Update, Delete)

stephen-bowser commented 1 year ago

hi @rohithqfr, any update on this?

rohithqfr commented 1 year ago

Hi Stephen,

For the first item, the fix will be to invalidate the scenario as the source will send all inserts,updates and deletes and selecting a key will be mandatory in the target. This will be picked up and fixed by end of april.

For the soft deletes and tracking changes in the sink. There is no fixed date and planning available yet.

Thanks & Regards, Rohith