从 ADLS gen2 上的增量表进行流式传输时出现 AzureBlobFileSystem FileNotFoundException

发布于 2025-01-15 21:54:24 字数 6307 浏览 4 评论 0原文

当我从 Azure Datalake Storage (ADLS) Gen2 上托管的 Delta 表流式传输数据时,流会运行一段时间,然后失败并出现以下错误。该错误表明该路径不存在,但我可以在存储日志中看到在错误之前和之后从该路径成功写入和读取文件。尽管有例外,但可以肯定地说该路径确实存在于 Azure 存储中。

对于上下文:

  • 我正在使用 Spark 3.1 (pySpark),
  • 我有一个单独的流通过 ForeachBatch 接收器主动将数据写入增量表。
  • 增量表是一个托管表。
  • 当输入和输出流在同一集群和单独集群上运行时,会发生这种情况。
  • 我正在使用 Azure Synapse。

我尝试过的修复:

  1. 将批处理执行间隔从 None 增加到 10 秒。此后,查询从大约 15 分钟后失败并出现以下错误到一个多小时后失败。
  2. 切换到高级 ADLS 帐户(无效)。

我发现另一个人也出现此错误,但没有提供解决方案: https://github .com/delta-io/delta/issues/932 因为它被问到了错误的受众。似乎可以根据问题,通过读取 Spark 流并将其写入 ADLS gen2 上托管的增量表来进行简单的重现。

如何确定根本原因?我可以更改任何 Spark 或 ADLS 设置来缓解这种情况吗?

22/03/19 02:06:20 ERROR MicroBatchExecution: Query [id = 00f1d866-74a2-42f9-8fb6-c8d1a76e00a6, runId = 902f8480-4dc6-4a7d-aada-bfe3b660d288] terminated with error
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:408)
    at org.apache.spark.sql.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.scala:69)
    at org.apache.spark.sql.delta.DeltaLog.getChanges(DeltaLog.scala:227)
    at org.apache.spark.sql.delta.sources.DeltaSource.filterAndIndexDeltaLogs$1(DeltaSource.scala:190)
    at org.apache.spark.sql.delta.sources.DeltaSource.getFileChanges(DeltaSource.scala:203)
    at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame(DeltaSource.scala:117)
    at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame$(DeltaSource.scala:112)
    at org.apache.spark.sql.delta.sources.DeltaSource.getFileChangesAndCreateDataFrame(DeltaSource.scala:144)
    at org.apache.spark.sql.delta.sources.DeltaSource.getBatch(DeltaSource.scala:385)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:486)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:482)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:482)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
    at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:207)
    at org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:231)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:905)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:876)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:858)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:405)
    ... 37 more

When I stream data from a Delta table hosted on Azure Datalake Storage (ADLS) Gen2, the stream works for a little bit before failing with the error below. The error says that the path doesn't exist, but I can see in the storage logs that files are successfully being written and read from that path before and after the error. It seems safe to say that the path does exist in Azure Storage, despite the exception.

For context:

  • I am using Spark 3.1 (pySpark)
  • I have a separate stream actively writing data to the delta table via a ForeachBatch sink.
  • The delta table is a managed table.
  • This happens when the input and output streams are running on the same cluster and separate clusters.
  • I am using Azure Synapse.

Fixes I've tried:

  1. Increasing the batch execution interval from None to 10 seconds. After this, the query went from failing after ~15 minutes with the error below to failing after a little over an hour.
  2. Switching to a premium tier ADLS account (no effect).

I found one other person with this error, but no solution was provided: https://github.com/delta-io/delta/issues/932 since it was asked to the wrong audience. It seems that a simple repro can be made by reading and writing Spark streams to a delta table hosted on ADLS gen2, based on their Issue.

How can I pin down the root cause? Are there any Spark or ADLS settings I can change to mitigate this?

22/03/19 02:06:20 ERROR MicroBatchExecution: Query [id = 00f1d866-74a2-42f9-8fb6-c8d1a76e00a6, runId = 902f8480-4dc6-4a7d-aada-bfe3b660d288] terminated with error
java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:408)
    at org.apache.spark.sql.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.scala:69)
    at org.apache.spark.sql.delta.DeltaLog.getChanges(DeltaLog.scala:227)
    at org.apache.spark.sql.delta.sources.DeltaSource.filterAndIndexDeltaLogs$1(DeltaSource.scala:190)
    at org.apache.spark.sql.delta.sources.DeltaSource.getFileChanges(DeltaSource.scala:203)
    at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame(DeltaSource.scala:117)
    at org.apache.spark.sql.delta.sources.DeltaSourceBase.getFileChangesAndCreateDataFrame$(DeltaSource.scala:112)
    at org.apache.spark.sql.delta.sources.DeltaSource.getFileChangesAndCreateDataFrame(DeltaSource.scala:144)
    at org.apache.spark.sql.delta.sources.DeltaSource.getBatch(DeltaSource.scala:385)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$3(MicroBatchExecution.scala:486)
    at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:27)
    at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
    at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:27)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$2(MicroBatchExecution.scala:482)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:482)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$anon$1.run(StreamExecution.scala:244)
Caused by: Operation failed: "The specified path does not exist.", 404, GET, https://example.dfs.core.windows.net/workspace?upn=false&resource=filesystem&maxResults=5000&directory=synapse/workspaces/example/warehouse/my_table/_delta_log&timeout=90&recursive=false, PathNotFound, "The specified path does not exist. RequestId:e8c6fb8e-101f-00cb-5c35-3b717e000000 Time:2022-03-19T02:06:20.8352277Z"
    at org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation.execute(AbfsRestOperation.java:207)
    at org.apache.hadoop.fs.azurebfs.services.AbfsClient.listPath(AbfsClient.java:231)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:905)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:876)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:858)
    at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:405)
    ... 37 more

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

囍笑 2025-01-22 21:54:24

请检查以下几点:

  • 请务必检查您是否具有权限。您将需要
    存储 Blob 数据贡献者角色在存储帐户/(贡献者访问数据湖和容器)上工作。
  • 尝试通过清除缓存来重新启动集群。
  • 重新创建集群可能是另一个故障排除步骤,以确保一切正确完成。
  • 原因也可能是 Azure 中的网络问题:瞬态-faults
  • 尝试将容器的访问级别更改为匿名访问。
  • 还要再次检查 hdfs 中的路径。

否则问题原因可能是
有多个作业写入同一个集群,其中一个正在清理,而另一个正在设置并混淆

注意

  1. 为了在一定程度上避免错误,请确保您的作业不会同时写入同一个表。
  2. 使用您可以使用的最新版本的 Spark。

参考文献:

  1. Databricks尝试在 ADLS Gen2 上创建增量表时出错 -
    堆栈溢出
  2. 天蓝色数据块-
    数据集/数据帧 - Stack Overflow
  3. python - FileNotFoundException - 堆栈溢出

Please check with below points:

  • Please make sure to check if you have permissions. You will need the
    Storage Blob Data Contributor role on the storage account/ (contributor access to the data lake and the container) to work.
  • Try to restart the cluster by clearing the cache.
  • Recreating the cluster can be another troubleshooting step to ensure everything is done correctly.
  • The cause can be a network issue in Azure too: transient-faults
  • Try by changing the container’s access level to Anonymous access.
  • Also check with the path again in hdfs .

Otherwise the problem cause can be
Having multiple jobs writing to the same cluster, and one is cleaning up while the other is setting up and getting mixed up

Note

  1. To avoid the error to some extent please make sure your jobs are not writing to the same table simultaneously.
  2. Work with the most recent version of spark you can work with.

References:

  1. Databricks error while trying to create delta table on ADLS Gen2 -
    Stack Overflow
  2. azure databricks -
    the Dataset/DataFrame - Stack Overflow
  3. python - FileNotFoundException- Stack Overflow
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文