Azure / azure-event-hubs-spark

Enabling Continuous Data Processing with Apache Spark and Azure Event Hubs
Apache License 2.0
233 stars 172 forks source link

Checkpoint write to blob storage using hadoop configuration #674

Open ddevidence opened 1 year ago

ddevidence commented 1 year ago

Bug Report:

Description: Spark Scala streaming application reads dataset from EventHub and writes processed dataset to ADLS Gen2, that part of the application (without hadoop configuration) works fine using the Client Credentials using the following lib

The issue is the usage of hadoop configuration using the client-credentials, doesn't work for writing checkpoints to the Blob storage for the streaming application described above

sorry about the long stacktrace..

23/05/30 20:46:21 ERROR AzureNativeFileSystemStore: Service returned StorageException when checking existence of container XXXXXXXXXXX in account XXXXXXXX.blob.core.windows.net com.microsoft.azure.storage.StorageException: An unknown failure occurred : Connection reset at com.microsoft.azure.storage.StorageException.translateException(StorageException.java:67) at com.microsoft.azure.storage.core.ExecutionEngine.executeWithRetry(ExecutionEngine.java:209) at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:769) at com.microsoft.azure.storage.blob.CloudBlobContainer.exists(CloudBlobContainer.java:756) at org.apache.hadoop.fs.azure.StorageInterfaceImpl$CloudBlobContainerWrapperImpl.exists(StorageInterfaceImpl.java:233) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.connectUsingAnonymousCredentials(AzureNativeFileSystemStore.java:892) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.createAzureStorageSession(AzureNativeFileSystemStore.java:1118) at org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.initialize(AzureNativeFileSystemStore.java:566) at org.apache.hadoop.fs.azure.NativeAzureFileSystem.initialize(NativeAzureFileSystem.java:1423) at org.apache.hadoop.fs.DelegateToFileSystem.<init>(DelegateToFileSystem.java:54) at org.apache.hadoop.fs.azure.Wasbs.<init>(Wasbs.java:40) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source) at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstanceWithCaller(Unknown Source) at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source) at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:143) at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:181) at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:266) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:342) at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:339) at java.base/java.security.AccessController.doPrivileged(Unknown Source) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878) at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:339) at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:465) at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:316) at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.<init>(CheckpointFileManager.scala:357) at org.apache.spark.sql.execution.streaming.CheckpointFileManager$.create(CheckpointFileManager.scala:209) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.resolveCheckpointLocation(ResolveWriteToStream.scala:89) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:42) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$$anonfun$apply$1.applyOrElse(ResolveWriteToStream.scala:40) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:31) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:31) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:40) at org.apache.spark.sql.execution.streaming.ResolveWriteToStream$.apply(ResolveWriteToStream.scala:39) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74) at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:270) at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:346) at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:430) at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:365) at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:249) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor$.executeStreamProcessor(StreamEvidenceProcessor.scala:290) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor$.main(StreamEvidenceProcessor.scala:304) at com.usbank.shieldplatform.evidence.streaming.StreamEvidenceProcessor.main(StreamEvidenceProcessor.scala) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1020) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:215) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1111) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

The hadoop configuration code

def createSparkSessionWithHadoopConfigBlobWrite(appName: String): SparkSession = { val spark = SparkSession.builder() .master("local[*]") .appName(appName) .config("spark.streaming.stopGracefullyOnShutdown", "true") .config("spark.sql.streaming.log.enableTimeStamps", "true") .config("spark.sql.streaming.log.malformedEventLogEnabled", "true") .config(s"spark.hadoop.fs.azure.account.auth.type.$blobStorageAccount.blob.core.windows.net", "OAuth") .config(s"spark.hadoop.fs.azure.account.oauth.provider.type.$blobStorageAccount.blob.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") .config(s"spark.hadoop.fs.azure.account.oauth2.client.id.$blobStorageAccount.blob.core.windows.net", clientId) .config(s"spark.hadoop.fs.azure.account.oauth2.client.secret.$blobStorageAccount.blob.core.windows.net", clientSecret) .config(s"spark.hadoop.fs.azure.account.oauth2.client.endpoint.$blobStorageAccount.blob.core.windows.net", "https://login.microsoftonline.com/XXXXXX-XXXX-XXXX-XXXX-XXXXXXXX/oauth2/token") .config("spark.sql.streaming.checkpointLocation", s"wasbs://$containerName@$blobStorageAccount.blob.core.windows.net/$checkPointWritePath") .getOrCreate() println(s"CHKPOINT Location: wasbs://$containerName@$blobStorageAccount.blob.core.windows.net/$checkPointWritePath") spark }

yamin-msft commented 1 year ago

Currently, we only support checkpoint location which is a path in an HDFS compatible file system. We don't have a firm deadline for this feature yet.

ddevidence commented 1 year ago

Ref: https://learn.microsoft.com/en-us/azure/hdinsight/overview-data-lake-storage-gen2#core-functionality-of-azure-data-lake-storage-gen2

• Access that is compatible with Hadoop: In Azure Data Lake Storage Gen2, you can manage and access data just as you would with a Hadoop Distributed File System (HDFS). The Azure Blob File System (ABFS) driver is available within all Apache Hadoop environments, including Azure HDInsight and Azure Databricks. Use ABFS to access data stored in Data Lake Storage Gen2.

– this ref seems to indicate ADLS Gen2 supports Hadoop operations which lead to the assumption the library would support checkpoint write as part of the spark hadoopConfiguration.

Can you provide insight ? if this would be a significant feature update or perhaps something on the lower-end.. just want to have this available on a priority for the use-case at hand.

yamin-msft commented 1 year ago

This will require some works and I am not able to provide a deadline for it