delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.39k stars 1.66k forks source link

Path doesn't exist error while streaming from Delta table in Azure Data Lake Gen2 #932

Closed rajasekarv closed 2 years ago

rajasekarv commented 2 years ago

While evaluating delta for streaming use case, I came across the following issue.

Data flow:

1. Rate data to kafka:

val df = spark.readStream.format("rate").option("rowsPerSecond", 10).option("numPartitions", 1).load().selectExpr("to_json(named_struct('timestamp',timestamp,'value',value)) as value")

df.writeStream.format("kafka").option("kafka.bootstrap.servers","***:9094").option("topic", "ratedata").option("checkpointLocation", "stream/ratedataproducer/").start().awaitTermination()

2. kafka to Delta

3. Delta to console

import org.apache.spark.sql.streaming.Trigger val df = spark.readStream.format("delta").load("abfss://*@.dfs.core.windows.net/stream/ratedata")

df.createOrReplaceTempView("ratedata") val wdf = spark.sql("select id, date_format(cast(timestamp as timestamp), 'HH:mm:ss') as sourcetime,date_format(current_timestamp(),'HH:mm:ss') as currenttime from ratedata") wdf.writeStream.format("console").option("checkpointLocation", "/tmp/ratedataconsumer").outputMode("append").trigger(Trigger.ProcessingTime("1 SECONDS")).start().awaitTermination()

It streams fine for sometime and randomly throws the following error:

_java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://shddatalakesta.dfs.core.windows.net/datalake-uat?upn=false&resource=filesystem&maxResults=500&directory=stream/rateda ta/_deltalog&timeout=90&recursive=false, PathNotFound, "The specified path does not exist.

I have tested this with multiple storage containers and tables, and I am able to reproduce this consistently.

scottsand-db commented 2 years ago

Hi @rajasekarv, are you still getting this issue?

From your description it seems like a storage issue, and not specific to Delta.

rajasekarv commented 2 years ago

I can still reproduce the issue. Let me raise the issue with Azure and will update here.

0xdarkman commented 2 years ago

has it been resolved? if yes then how?

zsxwing commented 2 years ago

@0xdarkman are you hitting the same issue? If so, could you open a new issue with the details? We closed this ticket due to inactivity.

0xdarkman commented 2 years ago

@zsxwing yes https://github.com/delta-io/delta/issues/1277

0xdarkman commented 2 years ago

I can still reproduce the issue. Let me raise the issue with Azure and will update here.

@rajasekarv have you managed to solve it?

0xdarkman commented 2 years ago

I am getting yet with other spark job and other storage account container on which i have public access set:

"Caused by: java.io.FileNotFoundException: 10m/per_ip/_delta_log is not found"

rajasekarv commented 2 years ago

Sorry for the delated response @0xdarkman . We didn't bother with Azure team even though we should have. We tried the same by mounting blob-fuse and it seemed fine with that. So I guess the issue is not in the blob storage and might be in the Hadoop HDFS implementation which Spark uses. It is easily reproducible so you can try raising this in Hadoop Jira or else go with the workaround I mentioned.

0xdarkman commented 2 years ago

@rajasekarv i opened ticket to microsoft, produced data for them, they looked into it and they confirmed it is a known bug to them. so it means Microsoft Azure has confirmed it is on them.

rajasekarv commented 2 years ago

@0xdarkman Glad to hear. However it is bit surprising to me. Since blob fuse is working fine. Blobfuse is using Goland SDK behind. So I assumed the issue might be coming from either Java Azure storage SDK or from HDFS implementation of Azure storage. If possible please update here if the issue is resolved for you.

zsxwing commented 2 years ago

@0xdarkman Glad to hear that MSFT figured out the issue. Could you post this information in the open ticket https://github.com/delta-io/delta/issues/1277 ? Thanks!