Azure / azure-kusto-spark

Apache Spark Connector for Azure Kusto
Apache License 2.0
77 stars 34 forks source link

Don't delete Kusto resources (e.g. table) immediately upon ingestion throwing a transient service exception #233

Open yihezkel opened 2 years ago

yihezkel commented 2 years ago

Reenactment/Timeline

  1. 10:34:04 – table is created
  2. 11:00:08 to 11:32:09 – 139 ingestions, each up to about a minute apart
  3. 11:32:16 – 1 final ingestion is received by the DM
  4. 11:32:18 – table is dropped, presumably because the Connector thinks it’s done ingesting
  5. 11:37:18 – DM Aggregator sends the 1 final ingestion to DM Importer, presumably because of a 5 minute batching policy
  6. 11:37:18 – 1 final ingestion is received by the Engine, yielding the “Table not found” errors

Issues

There are 2 issues here, at least 1 of which has something of a workaround:

  1. The ingestion batching policy is 5 minutes, allowing an ingestion to be significantly delayed. There may be workarounds by setting batching policies, but even if that worked as expected (see https://github.com/Azure/azure-kusto-spark/issues/217), this may not always fix the issue, as we still have a race condition.
  2. The main issue is that presumably there’s a buggy async process in Spark that doesn’t ensure all tasks are done before dropping the table. The Connector deletes the table when it encounters an exception during async ingestion, as well as in a finally after a try polling that all ingestions completed. So the root cause is that the Kusto service returned an exception, so the Spark Connector gave up and deleted all resources. Little did it know, there was still a message queued for ingestion that wouldn't be processed by the importer for another 5 minutes, at which point it'll fail because the table was deleted 5 minutes ago.

Expected behavior

Instead, the Connector should:

  1. Differentiate between transient and permanent errors. a. If transient, do not clean up (go to the catch/finally). Continue polling to give the service a chance to recover before considering itself done. Up to a [timeout], of course. b. If permanent, wait a [timeout] amount of time before cleaning up resources. Though there’s a permanent error being returned from the service, perhaps ingestion is working. We should give it a chance to complete before cleaning up resources, even though we’re blind in this scenario as to whether it will/did complete.
  2. I need to test and then merge my PR creating the temp table with the same ingestion batching policy as the destination table (referenced above), preventing 5 minute waits when inheriting a database batching policy isn’t possible. In fact, the [timeout] above should presumably be just a bit more than the time component of the batching policy we’re using.

Substantiation and more details

This happens rarely among all clusters, except for a few clusters, and further, the amount of Spark ingestion done by a cluster doesn’t seem to correlate with the number of these issues the cluster encounters. This confirms that the proximate cause is the service having health issues and sending an error response to the Connector, causing it to give up. Comparing the clusters facing this same issue: let sources = Usage | where KustoClient has_cs "Kusto.Spark.Connector" | project Source; KustoLogs | where Source in (sources) and Level == "Error" and EventText startswith_cs "Table 'sparkTempTable" and EventText has_cs "could not be found" and Timestamp > ago(7d) | project Source, Timestamp, Directory, ActivityType, EventText=substring(EventText, 0, 50) | summarize count(), min(Timestamp), max(Timestamp), take_any(EventText) by Source | top 15 by count_

To the clusters that ingest via the Spark connector: Usage | where KustoClient has_cs "Kusto.Spark.Connector" and Text startswith_cs ".move " and Text contains_cs "sparkTempTable" | summarize count(), Text=take_any(substring(Text, 0, 50)) by Source | top 30 by count_

From KustoLogs | where (Source in ("ADXTPMSDEVADXCLUSTER", "INGEST-ADXTPMSDEVADXCLUSTER") and Timestamp between (datetime(2022-04-06 09:34:03.9231199) .. datetime(2022-04-06 11:37:19.2406867))) or (Source in ("HOSTAXISKUSTOPDNS", "INGEST-HOSTAXISKUSTOPDNS") and Timestamp between (datetime(2022-04-06 03:17:40.6338220) .. datetime(2022-04-07 07:03:03.4836465))) | where Level == "Error" and EventText !contains "TableNotFoundException" and EventText !contains "FabricManager" and EventText !contains "Cannot access a disposed object." and SourceId !in ("5F76F3E6", "9EE66924") | summarize count(), EventText=take_any(EventText) by SourceId, Source | order by count_ desc // | top 4 by count_ We see these clusters facing these issues are all sorts of network, throttling and storage exceptions. So the connector gets such a response from the service, and then immediately gives up and deletes the destination table.

yihezkel commented 2 years ago

We discussed this and refined this further:

Before we do this, we would need to investigate which kinds of exceptions might be thrown to identify which are transient.

This bug is not high priority because the effect of the bug is:

  1. Spark Connector ingestions will fail and need to be retried more often due to transient errors, which can translate into unnecessary delays and resource utilization.
  2. Only customers who don’t follow best practices by not catching exceptions and retrying as appropriate for their business needs will lose this data.
  3. I reran the query to see how common this issue is, and only 1 cluster had more than 100 blobs in the last week that couldn’t be ingested because the temp table had been deleted. Hopefully their code caught our failure notification, and retried. So I think we’re safe with even a low priority.
yihezkel commented 2 years ago

We should also document user best practices of catching exceptions and retrying appropriately.