从 ADLS gen2 上的增量表进行流式传输时出现 AzureBlobFileSystem FileNotFoundException
当我从 Azure Datalake Storage (ADLS) Gen2 上托管的 Delta 表流式传输数据时,流会运行一段时间,然后失败并出现以下错误。该错误表明该路径不存在,但我可以在存储日志中看到在错误之前和之后从该路径成功写入和读取文件。尽管有例外,但可以肯定地说该路径确实存在于 Azure 存储中。
对于上下文:
- 我正在使用 Spark 3.1 (pySpark),
- 我有一个单独的流通过 ForeachBatch 接收器主动将数据写入增量表。
- 增量表是一个托管表。
- 当输入和输出流在同一集群和单独集群上运行时,会发生这种情况。
- 我正在使用 Azure Synapse。
我尝试过的修复:
- 将批处理执行间隔从
None
增加到10 秒
。此后,查询从大约 15 分钟后失败并出现以下错误到一个多小时后失败。 - 切换到高级 ADLS 帐户(无效)。
我发现另一个人也出现此错误,但没有提供解决方案: https://github .com/delta-io/delta/issues/932 因为它被问到了错误的受众。似乎可以根据问题,通过读取 Spark 流并将其写入 ADLS gen2 上托管的增量表来进行简单的重现。
如何确定根本原因?我可以更改任何 Spark 或 ADLS 设置来缓解这种情况吗?
22/03/19 02:06:20 ERROR MicroBatchExecution: Query [id = 00f1d866-74a2-42f9-8fb6-c8d1a76e00a6, runId = 902f8480-4dc6-4a7d-aada-bfe3b660d288] terminated with error
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:408)
at org.apache.spark.sql.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.scala:69)
at org.apache.spark.sql.delta.DeltaLog.getChanges(DeltaLog.scala:227)
at org.apache.spark.sql.delta.sources.DeltaSource.filterAndIndexDeltaLogs$1(DeltaSource.scala:190)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChanges(DeltaSource.scala:203)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame(DeltaSource.scala:117)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame$(DeltaSource.scala:112)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChangesAndCreateDataFrame(DeltaSource.scala:144)
at org.apache.spark.sql.delta.sources.DeltaSource.getBatch(DeltaSource.scala:385)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:486)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:207)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:231)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:905)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:876)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:858)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:405)
... 37 more
When I stream data from a Delta table hosted on Azure Datalake Storage (ADLS) Gen2, the stream works for a little bit before failing with the error below. The error says that the path doesn't exist, but I can see in the storage logs that files are successfully being written and read from that path before and after the error. It seems safe to say that the path does exist in Azure Storage, despite the exception.
For context:
- I am using Spark 3.1 (pySpark)
- I have a separate stream actively writing data to the delta table via a
ForeachBatch
sink. - The delta table is a managed table.
- This happens when the input and output streams are running on the same cluster and separate clusters.
- I am using Azure Synapse.
Fixes I've tried:
- Increasing the batch execution interval from
None
to10 seconds
. After this, the query went from failing after ~15 minutes with the error below to failing after a little over an hour. - Switching to a premium tier ADLS account (no effect).
I found one other person with this error, but no solution was provided: https://github.com/delta-io/delta/issues/932 since it was asked to the wrong audience. It seems that a simple repro can be made by reading and writing Spark streams to a delta table hosted on ADLS gen2, based on their Issue.
How can I pin down the root cause? Are there any Spark or ADLS settings I can change to mitigate this?
22/03/19 02:06:20 ERROR MicroBatchExecution: Query [id = 00f1d866-74a2-42f9-8fb6-c8d1a76e00a6, runId = 902f8480-4dc6-4a7d-aada-bfe3b660d288] terminated with error
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:408)
at org.apache.spark.sql.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.scala:69)
at org.apache.spark.sql.delta.DeltaLog.getChanges(DeltaLog.scala:227)
at org.apache.spark.sql.delta.sources.DeltaSource.filterAndIndexDeltaLogs$1(DeltaSource.scala:190)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChanges(DeltaSource.scala:203)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame(DeltaSource.scala:117)
at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame$(DeltaSource.scala:112)
at org.apache.spark.sql.delta.sources.DeltaSource.getFileChangesAndCreateDataFrame(DeltaSource.scala:144)
at org.apache.spark.sql.delta.sources.DeltaSource.getBatch(DeltaSource.scala:385)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:486)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:482)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:317)
at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:244)
Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:207)
at org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:231)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:905)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:876)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:858)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:405)
... 37 more
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
请检查以下几点:
存储 Blob 数据贡献者角色在存储帐户/(贡献者访问数据湖和容器)上工作。
否则问题原因可能是
有多个作业写入同一个集群,其中一个正在清理,而另一个正在设置并混淆
参考文献:
堆栈溢出
数据集/数据帧 - Stack Overflow
Please check with below points:
Storage Blob Data Contributor role on the storage account/ (contributor access to the data lake and the container) to work.
Otherwise the problem cause can be
Having multiple jobs writing to the same cluster, and one is cleaning up while the other is setting up and getting mixed up
References:
Stack Overflow
the Dataset/DataFrame - Stack Overflow