Spark Write(Parquet)到本地HDFS花费很长时间
上下文:
我有一个非常简单的 scala/spark 作业,我通过 SPARK JDBC READ 从 Microsoft SQL Server 关系数据库中的表中读取数据。读取的数据进入 DataFrame,然后我使用“df.write.option("compression", "snappy").parquet(outputDir)" 将这些数据写入本地 HDFS。
问题:
表非常小(小于1GB),但读取和读取需要2.2小时。写入HDFS。 我已经尝试禁用写入和读取速度非常快,换句话说,写入是这里的问题。
暂定:
- 已经尝试过大(92GB)和小(900mb)数据集。写入时间非常相似
- 已经尝试将提交者算法版本从1更改为2“spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version”,“2”
- 已经尝试删除镶木地板_SUCCESS文件创建 "mapreduce.fileoutputcommitter.marksuccessfuljobs", "false"
- 我已经在使用 "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
Spark 提交配置:
"spark.executor.instances": 20
"spark.sql.shuffle.partitions": 1000,
"spark.sql.parquet.compression.codec": "snappy",
"spark.port.maxRetries": 1000,
"spark.sql.broadcastTimeout": 36000,
"spark.executor.extraJavaOptions": "-XX:+UseG1GC",
"spark.driver.maxResultSize": "32G",
"spark.yarn.maxAppAttempts": 1,
"spark.dynamicAllocation.enabled": "false",
"spark.driver.cores": 1,
"spark.driver.memory": "20G",
"spark.driver.memoryOverhead": "2G",
"spark.executor.cores": 4,
"spark.executor.memory": "10G",
"spark.executor.memoryOverhead": "1G",
"spark.network.timeout": 360000,
"spark.executor.heartbeatInterval": 288000,
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": 2,
"spark.hadoop.mapred.output.committer.class": "org.apache.hadoop.mapred.DirectFileOutputCommitter",
"spark.hadoop.mapreduce.use.directfileoutputcommitter": "true",
"spark.hadoop.mapreduce.use.parallelmergepaths": "true",
"mapreduce.fileoutputcommitter.marksuccessfuljobs": "false"
一些代码详细信息:
将数据写入HDFS的方法
df.write
.mode(saveMode)
.option("compression","snappy")
.parquet(source.sourceOutputDir)
版本:
- Scala 2.11.2
- Spark 2.4.0
观察:
- 代码中没有JOIN,
- 它只是读取DataFrame &写入HDFS
Context:
I have a very simple scala/spark job where I READ data from a table in a Microsoft SQL Server relational database through SPARK JDBC READ. The read data goes to a DataFrame and then I use "df.write.option("compression", "snappy").parquet(outputDir)" to write these data to an on-premises HDFS.
Problem:
The table is very small (less than 1GB of size), but it is taking 2.2h to read & write to HDFS.
I already tried to DISABLE write and read was really fast, in other words, writing is the problem here.
Tentatives:
- Already tried with big (92gb) and small (900mb) datasets. Time to write was very similar
- Already tried to change commiter algorithm version from 1 to 2 "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2"
- Already tried to remove the parquet _SUCCESS file creation
"mapreduce.fileoutputcommitter.marksuccessfuljobs", "false" - I'm already using "spark.serializer", "org.apache.spark.serializer.KryoSerializer"
Spark Submit Configs:
"spark.executor.instances": 20
"spark.sql.shuffle.partitions": 1000,
"spark.sql.parquet.compression.codec": "snappy",
"spark.port.maxRetries": 1000,
"spark.sql.broadcastTimeout": 36000,
"spark.executor.extraJavaOptions": "-XX:+UseG1GC",
"spark.driver.maxResultSize": "32G",
"spark.yarn.maxAppAttempts": 1,
"spark.dynamicAllocation.enabled": "false",
"spark.driver.cores": 1,
"spark.driver.memory": "20G",
"spark.driver.memoryOverhead": "2G",
"spark.executor.cores": 4,
"spark.executor.memory": "10G",
"spark.executor.memoryOverhead": "1G",
"spark.network.timeout": 360000,
"spark.executor.heartbeatInterval": 288000,
"spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version": 2,
"spark.hadoop.mapred.output.committer.class": "org.apache.hadoop.mapred.DirectFileOutputCommitter",
"spark.hadoop.mapreduce.use.directfileoutputcommitter": "true",
"spark.hadoop.mapreduce.use.parallelmergepaths": "true",
"mapreduce.fileoutputcommitter.marksuccessfuljobs": "false"
Some code details:
Method that writes the data to HDFS
df.write
.mode(saveMode)
.option("compression","snappy")
.parquet(source.sourceOutputDir)
Versions:
- Scala 2.11.2
- Spark 2.4.0
Obs:
- There is no JOIN in the code
- Its only READ to DataFrame & Write to HDFS
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论