为什么spark dataset.map要求查询的所有部分都可以序列化?
我想使用dataset.map函数来转换数据集的行。示例看起来像这样:
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
其中testInstance
是一个扩展java.io.serializable
的类,但是testRepository
确实扩展了这一点。代码引发以下错误:
Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository
问题
我理解为什么testInstance.dosomeoperation
需要序列化,因为它在地图内部并将分配给Spark Workers。但是,为什么testrepository
需要序列化?我不明白为什么这是地图所必需的。将定义更改为类Testrepository扩展了Java.io.Serializable
解决了问题,但这在项目的较大上下文中是不可取的。
有没有办法在不制作testrepository序列化的情况下进行这项工作,或者为什么需要序列化?
最小的工作示例
这是一个完整的示例,其中两个类别的代码重现了NotSerializable Exception:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"
class TestRepository extends java.io.Serializable {
def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
spark.table(f"$database.$tableName")
.as[MyTableSchema]
}
}
val testRepository = new TestRepository()
class TestClass() extends java.io.Serializable {
def doSomeOperation(row: MyTableSchema): MyTableSchema = {
row
}
}
val testInstance = new TestClass()
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
I would like to use the Dataset.map function to transform the rows of my dataset. The sample looks like this:
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
where testInstance
is a class that extends java.io.Serializable
, but testRepository
does extend this. The code throws the following error:
Job aborted due to stage failure.
Caused by: NotSerializableException: TestRepository
Question
I understand why testInstance.doSomeOperation
needs to be serializable, since it's inside the map and will be distributed to the Spark workers. But why does testRepository
needs to be serialized? I don't see why that is necessary for the map. Changing the definition to class TestRepository extends java.io.Serializable
solves the issue, but that is not desirable in the larger context of the project.
Is there a way to make this work without making TestRepository serializable, or why is it required to be serializable?
Minimal working example
Here's a full example with the code from both classes that reproduces the NotSerializableException:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
case class MyTableSchema(id: String, key: String, value: Double)
val db = "temp_autodelete"
val tableName = "serialization_test"
class TestRepository extends java.io.Serializable {
def readTable(database: String, tableName: String): Dataset[MyTableSchema] = {
spark.table(f"$database.$tableName")
.as[MyTableSchema]
}
}
val testRepository = new TestRepository()
class TestClass() extends java.io.Serializable {
def doSomeOperation(row: MyTableSchema): MyTableSchema = {
row
}
}
val testInstance = new TestClass()
val result = testRepository.readTable(db, tableName)
.map(testInstance.doSomeOperation)
.count()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
原因是因为您的
MAP
操作是从已在执行者上发生的事物中读取的。如果您查看管道:
您要做的第一件事是
testrepository.readtable(db,tablename)
。如果我们在读取
方法中查看内部,我们会看到您在其中进行spark.table
操作。如果我们从 api docs ,我们看到以下功能签名:这不是仅在驱动程序上进行的操作(想象一下阅读阅读在仅在驱动程序上进行的> 1tb的文件中),并创建一个数据框(本身就是一个分布式数据集)。这意味着需要分发
testRepository.ReadTable(db,tableName)
函数,因此您的testrepository
需要分发对象。希望这对您有帮助!
The reason why is because your
map
operation is reading from something that already takes place on the executors.If you look at your pipeline:
The first thing you do is
testRepository.readTable(db, tableName)
. If we look inside of thereadTable
method, we see that you are doing aspark.table
operation in there. If we look at the function signature of this method from the API docs, we see the following function signature:This is not an operation that solely takes place on the driver (imagine reading in a file of >1TB while only taking place on the driver), and it creates a Dataframe (which is by itself a distributed dataset). That means that the
testRepository.readTable(db, tableName)
function needs to be distributed, and so yourtestRepository
object needs to be distributed.Hope this helps you!