Azure / azure-sdk-for-java

This repository is for active development of the Azure SDK for Java. For consumers of the SDK we recommend visiting our public developer docs at https://docs.microsoft.com/java/azure/ or our versioned developer docs at https://azure.github.io/azure-sdk-for-java.
MIT License
2.3k stars 1.96k forks source link

sdk/cosmos/azure-cosmos-spark_3-5_2-12 - "_corrupt_record" column isn't getting populated when reading from Cosmos DB #40241

Closed SurbhiSingi closed 3 months ago

SurbhiSingi commented 3 months ago

Encountered an issue while attempting to enable error handling for bad or corrupt data records (specifically, those resulting from datatype mismatches) when reading from one of our Cosmos DB containers in Databricks notebook. The aim is to apply logic to capture these problematic records in a designated column, such as "_corrupt_record". However, upon implementation, noticed that the "_corrupt_record" column remains unpopulated when the datatype mismatches however the actual fields get populated with null values.

Interestingly, when we utilize the same logic with a different data source by changing the format from "cosmos.oltp" to "csv" and read a CSV file containing corrupt data, the "_corrupt_record" column populates correctly, with the actual fields populate with null values(like cosmos). We are reaching out to seek clarification on whether this behaviour is expected when reading from Cosmos DB or if there might be something we are overlooking in our implementation.

Databricks Run Time - 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12) Cosmos Spark Library - com.azure.cosmos.spark:azure-cosmos-spark_3-4_2-12:4.26.1

Using the latest Databricks run time – 14.3 LTS and the latest spark cosmos connector - com.azure.cosmos.spark:azure-cosmos-spark_3-5_2-12:4.30.0, the issue persists.

Here is the code snippet:

df = spark.read.format("cosmos.oltp") \ .option("mode", "PERMISSIVE") \ .option("columnNameOfCorruptRecord", "_corrupt_record") \ .options(**cosmos_conn_read_config) \ .schema(schema) \ .load() display(df)

Note: schema has “_corrupt_record” as one of the columns.

github-actions[bot] commented 3 months ago

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @kushagraThapar @pjohari-ms @TheovanKraay.

FabianMeiswinkel commented 3 months ago

Supporting an option "columnNameOfCorruptRecord" is not a generic Spark Connector capability - only the file-based Spark connectors for csv/json support it - so, this mechanism can only be used for those connectors - not any other DataSource V2 connector like Cosmos DB.

SurbhiSingi commented 3 months ago

@FabianMeiswinkel - Thank you for the clarification. Will this feature be available in future versions of spark cosmosDB connector?

FabianMeiswinkel commented 3 months ago

Right now, we have no plans to add this feature - schema inference is optional - so, if workloads use widely varying schemas they can always disable schema inference.

SurbhiSingi commented 3 months ago

Thank you for your response. Please inform us once this feature is enabled, and if possible, provide tentative timelines for its availability. Thank you!