databricks-industry-solutions / hls-payer-mrf-sparkstreaming

Spark Structured Streaming for Payer MRF use case
Other
14 stars 6 forks source link

Failure to initialize configuration for storage account <storage_acc>.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key #30

Open Nishanbuyo opened 2 months ago

Nishanbuyo commented 2 months ago

Changes in JsonMRFSource.scala

   case Some("abfss") =>
      val serviceId = dbutils.secrets.get(scope="your-secrets-scope", key="azure-sp-id")
      val serviceCredential = dbutils.secrets.get(scope="your-secrets-scope", key="azure-sp-secret")
      val TENANT_ID = <your-azure-tenant-id>
      val accountName = <storage-account-name>
      val blobEndpoint = s"$accountName.dfs.core.windows.net"
      println("Conf --> setting filesystem to fs.azurebfs.AzureBlobFileSystem")
      hadoopConf.set("fs.azurebfs.impl", "org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem")
      hadoopConf.set(s"fs.azure.account.auth.type.$blobEndpoint", "OAuth")
      hadoopConf.set(s"fs.azure.account.oauth.provider.type.$blobEndpoint", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
      hadoopConf.set(s"fs.azure.account.oauth2.client.id.$blobEndpoint", serviceId)
      hadoopConf.set(s"fs.azure.account.oauth2.client.secret.$blobEndpoint", serviceCredential)
      hadoopConf.set(s"fs.azure.account.oauth2.client.endpoint.$blobEndpoint", "https://login.microsoftonline.com/" + TENANT_ID + "/oauth2/token")

Added this in JsonMRFSource class to access azure blob storage using service principle but getting the error as Invalid configuration value detected for fs.azure.account.key

Pyspark code in databricks

file_location = 'abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/sample.json'

df = spark.readStream.format("payer-mrf").option("buffersize", 67108864).\
           .option("filesystem", "abfss")\
           .load(file_location)

query = (
df.writeStream 
 .outputMode("append") 
 .format("delta")
 .trigger(processingTime="10 seconds")
 .option("truncate", "false") 
 .option("checkpointLocation", f"{file_location}_checkpoint",) 
 .table("db_name.table_name") 
)

Error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 27.0 failed 4 times, most recent failure: Lost task 3.3 in stage 27.0 (TID 90) (10.110.139.10 executor 1): Failure to initialize configuration for storage account complydbxscud01.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:52)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:682)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:2077)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:269)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:235)
    at com.databricks.common.filesystem.LokiABFS.initialize(LokiABFS.scala:36)
    at com.databricks.common.filesystem.LokiFileSystem$.$anonfun$getLokiFS$1(LokiFileSystem.scala:149)
    at com.databricks.common.filesystem.FileSystemCache.getOrCompute(FileSystemCache.scala:46)
    at com.databricks.common.filesystem.LokiFileSystem$.getLokiFS(LokiFileSystem.scala:146)
    at com.databricks.common.filesystem.LokiFileSystem.initialize(LokiFileSystem.scala:211)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3611)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:554)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at com.databricks.labs.sparkstreaming.jsonmrf.JsonMRFRDD$.$anonfun$fs$1(JsonChunks.scala:107)
    at com.databricks.labs.sparkstreaming.jsonmrf.JsonMRFRDD.compute(JsonChunks.scala:38)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: Invalid configuration value detected for fs.azure.account.key
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.ConfigurationBasicValidator.validate(ConfigurationBasicValidator.java:49)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator.validate(Base64StringConfigurationBasicValidator.java:40)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.validateStorageAccountKey(SimpleKeyProvider.java:71)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:49)
    ... 46 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3900)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3822)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3809)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3809)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1685)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1670)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1670)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:4146)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4058)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:4046)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:54)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1348)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1336)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:3005)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2988)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$2(FileFormatWriter.scala:352)
    at org.apache.spark.sql.catalyst.MetricKeyUtils$.measureMs(MetricKey.scala:727)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$1(FileFormatWriter.scala:350)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:395)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:322)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$1(FileFormatWriter.scala:303)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
    at com.databricks.sql.transaction.tahoe.commands.WriteIntoDeltaCommand.run(WriteIntoDeltaCommand.scala:114)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$14(TransactionalWriteEdge.scala:700)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:387)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:691)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:276)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:628)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$writeFiles$1(TransactionalWriteEdge.scala:700)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag(DeltaLogging.scala:225)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.withOperationTypeTag$(DeltaLogging.scala:212)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.withOperationTypeTag(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$2(DeltaLogging.scala:164)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:294)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:292)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:163)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:573)
    at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:669)
    at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:687)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags(UsageLogging.scala:664)
    at com.databricks.logging.UsageLogging.recordOperationWithResultTags$(UsageLogging.scala:582)
    at com.databricks.spark.util.PublicDBLogging.recordOperationWithResultTags(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.UsageLogging.recordOperation(UsageLogging.scala:573)
    at com.databricks.logging.UsageLogging.recordOperation$(UsageLogging.scala:542)
    at com.databricks.spark.util.PublicDBLogging.recordOperation(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.spark.util.PublicDBLogging.recordOperation0(DatabricksSparkUsageLogger.scala:68)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:150)
    at com.databricks.spark.util.UsageLogger.recordOperation(UsageLogger.scala:68)
    at com.databricks.spark.util.UsageLogger.recordOperation$(UsageLogger.scala:55)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.recordOperation(DatabricksSparkUsageLogger.scala:109)
    at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:429)
    at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:408)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:162)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:152)
    at com.databricks.sql.transaction.tahoe.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:142)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.$anonfun$recordWriteFilesOperation$1(TransactionalWriteEdge.scala:354)
    at com.databricks.sql.acl.CheckPermissions$.$anonfun$trusted$2(CheckPermissions.scala:2185)
    at com.databricks.sql.util.ThreadLocalTagger.withTag(QueryTagger.scala:62)
    at com.databricks.sql.util.ThreadLocalTagger.withTag$(QueryTagger.scala:59)
    at com.databricks.sql.util.QueryTagger$.withTag(QueryTagger.scala:132)
    at com.databricks.sql.acl.CheckPermissions$.trusted(CheckPermissions.scala:2185)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.recordWriteFilesOperation(TransactionalWriteEdge.scala:353)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles(TransactionalWriteEdge.scala:386)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles$(TransactionalWriteEdge.scala:380)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles(TransactionalWriteEdge.scala:776)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWriteEdge.writeFiles$(TransactionalWriteEdge.scala:766)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWrite.writeFiles(TransactionalWrite.scala:308)
    at com.databricks.sql.transaction.tahoe.files.TransactionalWrite.writeFiles$(TransactionalWrite.scala:305)
    at com.databricks.sql.transaction.tahoe.OptimisticTransaction.writeFiles(OptimisticTransaction.scala:161)
    at com.databricks.sql.transaction.tahoe.sources.DeltaSink.$anonfun$addBatchWithStatusImpl$4(DeltaSink.scala:207)
    at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:531)
    at com.databricks.sql.transaction.tahoe.sources.DeltaSink.addBatchWithStatusImpl(DeltaSink.scala:207)
    at com.databricks.sql.transaction.tahoe.sources.DeltaSink.addBatchWithStatus(DeltaSink.scala:153)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.addBatch(MicroBatchExecution.scala:1246)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:1471)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$9(SQLExecution.scala:387)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:691)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:276)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:628)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:1457)
    at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:321)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:1457)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$6(MicroBatchExecution.scala:771)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.handleDataSourceException(MicroBatchExecution.scala:1825)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$5(MicroBatchExecution.scala:771)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:1793)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$4(MicroBatchExecution.scala:767)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:321)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$3(MicroBatchExecution.scala:727)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
    at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612)
    at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491)
    at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489)
    at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:84)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:721)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1(MicroBatchExecution.scala:680)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$1$adapted(MicroBatchExecution.scala:680)
    at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:83)
    at org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:71)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.runOneBatch(TriggerExecutor.scala:128)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:141)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStreamWithListener(MicroBatchExecution.scala:680)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:458)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$2(StreamExecution.scala:447)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1175)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:398)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:426)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:216)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:424)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:418)
    at com.databricks.spark.util.PublicDBLogging.withAttributionContext(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:472)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:455)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags(DatabricksSparkUsageLogger.scala:27)
    at com.databricks.spark.util.PublicDBLogging.withAttributionTags0(DatabricksSparkUsageLogger.scala:72)
    at com.databricks.spark.util.DatabricksSparkUsageLogger.withAttributionTags(DatabricksSparkUsageLogger.scala:172)
    at com.databricks.spark.util.UsageLogging.$anonfun$withAttributionTags$1(UsageLogger.scala:491)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:603)
    at com.databricks.spark.util.UsageLogging$.withAttributionTags(UsageLogger.scala:612)
    at com.databricks.spark.util.UsageLogging.withAttributionTags(UsageLogger.scala:491)
    at com.databricks.spark.util.UsageLogging.withAttributionTags$(UsageLogger.scala:489)
    at org.apache.spark.sql.execution.streaming.StreamExecution.withAttributionTags(StreamExecution.scala:84)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:378)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$3(StreamExecution.scala:282)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$2(StreamExecution.scala:282)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:281)
Caused by: Failure to initialize configuration for storage account complydbxscud01.dfs.core.windows.net: Invalid configuration value detected for fs.azure.account.keyInvalid configuration value detected for fs.azure.account.key
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:52)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:682)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:2077)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:269)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:235)
    at com.databricks.common.filesystem.LokiABFS.initialize(LokiABFS.scala:36)
    at com.databricks.common.filesystem.LokiFileSystem$.$anonfun$getLokiFS$1(LokiFileSystem.scala:149)
    at com.databricks.common.filesystem.FileSystemCache.getOrCompute(FileSystemCache.scala:46)
    at com.databricks.common.filesystem.LokiFileSystem$.getLokiFS(LokiFileSystem.scala:146)
    at com.databricks.common.filesystem.LokiFileSystem.initialize(LokiFileSystem.scala:211)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3611)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:554)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    at com.databricks.labs.sparkstreaming.jsonmrf.JsonMRFRDD$.$anonfun$fs$1(JsonChunks.scala:107)
    at com.databricks.labs.sparkstreaming.jsonmrf.JsonMRFRDD.compute(JsonChunks.scala:38)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:201)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:186)
    at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:151)
    at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:145)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:958)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:105)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:961)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:853)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Caused by: Invalid configuration value detected for fs.azure.account.key
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.ConfigurationBasicValidator.validate(ConfigurationBasicValidator.java:49)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator.validate(Base64StringConfigurationBasicValidator.java:40)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.validateStorageAccountKey(SimpleKeyProvider.java:71)
    at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider.getStorageAccountKey(SimpleKeyProvider.java:49)
    ... 46 more
zavoraad commented 1 month ago

Hi @Nishanbuyo ,

This is an issue connecting Databricks to Azure blob storage. If you can access abfss in a notebook, that same spark.conf should work for this jar (you can do this work directly in a notebook, no need to update the jar).

The workaround I've seen has been to mount the directory and read from the mount point.

I'll leave this issue open, but it's broadly related to connecting Azure Gen2 storage to Databricks Spark.

Nishanbuyo commented 1 month ago

Hi @zavoraad , I can access abfss in notebook i.e file can be read directly but when same file path is passed from the library, it gives authentication error

Mounting the directory fixes the issue but thats not an option for our project

zavoraad commented 1 month ago

Hi @Nishanbuyo,

The error is specifically happening when the executor tries to open the file to read

The filesystem gets instantiated differently in the unreleased jar, from the file Path instead of from HadoopConf.

Can you see if the error still persists using the newer jar? Note it's running Spark 3.4.1.

tlibs313 commented 1 week ago

Good evening. I'm not sure where to put this, but I was able to update your base code to Scala 1.12.18 and Spark 3.5.0. This is a genius package and I used it to get familiar with Scala/Spark programming. I wanted to share but I am in no way a scala developer, and, therefore, was not like to fork or update this repo directly. The main update was with the RowEncoder needed to be converted to ExpressionEncoder[Row]. That, and the notebook code for the streamWriter: change table('...') to .toTable('...').