为什么spark dataset.map要求查询的所有部分都可以序列化?

发布于 2025-02-11 16:14:22 字数 1544 浏览 3 评论 0原文

我想使用dataset.map函数来转换数据集的行。示例看起来像这样:

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

其中testInstance是一个扩展java.io.serializable的类,但是testRepository确实扩展了这一点。代码引发以下错误:

Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository

问题

我理解为什么testInstance.dosomeoperation需要序列化,因为它在地图内部并将分配给Spark Workers。但是,为什么testrepository需要序列化?我不明白为什么这是地图所必需的。将定义更改为类Testrepository扩展了Java.io.Serializable解决了问题,但这在项目的较大上下文中是不可取的。

有没有办法在不制作testrepository序列化的情况下进行这项工作,或者为什么需要序列化?

最小的工作示例

这是一个完整的示例,其中两个类别的代码重现了NotSerializable Exception:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"

class TestRepository extends java.io.Serializable {
  def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
    spark.table(f"$database.$tableName")
    .as[MyTableSchema]
  }
}

val testRepository = new TestRepository()

class TestClass() extends java.io.Serializable {
  def doSomeOperation(row: MyTableSchema): MyTableSchema = {
  row 
  }
}

val testInstance = new TestClass()

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

I would like to use the Dataset.map function to transform the rows of my dataset. The sample looks like this:

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

where testInstance is a class that extends java.io.Serializable, but testRepository does extend this. The code throws the following error:

Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository

Question

I understand why testInstance.doSomeOperation needs to be serializable, since it's inside the map and will be distributed to the Spark workers. But why does testRepository needs to be serialized? I don't see why that is necessary for the map. Changing the definition to class TestRepository extends java.io.Serializable solves the issue, but that is not desirable in the larger context of the project.

Is there a way to make this work without making TestRepository serializable, or why is it required to be serializable?

Minimal working example

Here's a full example with the code from both classes that reproduces the NotSerializableException:

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"

class TestRepository extends java.io.Serializable {
  def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
    spark.table(f"$database.$tableName")
    .as[MyTableSchema]
  }
}

val testRepository = new TestRepository()

class TestClass() extends java.io.Serializable {
  def doSomeOperation(row: MyTableSchema): MyTableSchema = {
  row 
  }
}

val testInstance = new TestClass()

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

疑心病 2025-02-18 16:14:22

原因是因为您的MAP操作是从已在执行者上发生的事物中读取的。

如果您查看管道:

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

您要做的第一件事是testrepository.readtable(db,tablename)。如果我们在读取方法中查看内部,我们会看到您在其中进行spark.table操作。如果我们从 api docs ,我们看到以下功能签名:

def table(tableName: String): DataFrame

这不是仅在驱动程序上进行的操作(想象一下阅读阅读在仅在驱动程序上进行的> 1tb的文件中),并创建一个数据框(本身就是一个分布式数据集)。这意味着需要分发testRepository.ReadTable(db,tableName)函数,因此您的testrepository需要分发对象。

希望这对您有帮助!

The reason why is because your map operation is reading from something that already takes place on the executors.

If you look at your pipeline:

val result = testRepository.readTable(db, tableName)
  .map(testInstance.doSomeOperation)
  .count()

The first thing you do is testRepository.readTable(db, tableName). If we look inside of the readTable method, we see that you are doing a spark.table operation in there. If we look at the function signature of this method from the API docs, we see the following function signature:

def table(tableName: String): DataFrame

This is not an operation that solely takes place on the driver (imagine reading in a file of >1TB while only taking place on the driver), and it creates a Dataframe (which is by itself a distributed dataset). That means that the testRepository.readTable(db, tableName) function needs to be distributed, and so your testRepository object needs to be distributed.

Hope this helps you!

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文