在集群中的执行器上运行 python 脚本 [Scala/Spark]
我有 python 脚本:
import sys
for line in sys.stdin:
print("hello " + line)
我在集群中的工作人员上运行它:
def run(spark: SparkSession) = {
val data = List("john","paul","george","ringo")
val dataRDD = sc.makeRDD(data)
val scriptPath = getClass.getResource("test.py").getPath
val pipeRDD = dataRDD.pipe(command = "python3 " ++ scriptPath)
pipeRDD.foreach(println)
}
输出
hello john
hello Ringo
hello george
hello paul
我有几个问题,请告诉我。 我可以在 python 脚本中访问 Spark 会话吗? 或者我可以在 python 脚本中创建一个文件并将其保存到 hdfs 文件系统吗?
事实上,这就是我想做的 我想在 python 脚本中创建 csv 文件,并将它们保存到 hdfs。
还有一个小问题。 是否可以向工作人员发送命令来安装 python 软件包?
例如:pip install pandas
!UPD: 我对 python 文件做了一些更改。
#!/usr/bin/python
# -*- coding: utf-8 -*-
#import pandas as pd
import sys
import os
for line in sys.stdin:
with open('readme.csv', 'w') as f:
f.write('Name,Last Name\nМихаил,Зубенко')
print(os.getcwd() + '/readme.csv')
该文件是在容器内创建的:
现在我有一个问题。我如何访问该文件?
I have python script:
import sys
for line in sys.stdin:
print("hello " + line)
And I run it on workers in cluster:
def run(spark: SparkSession) = {
val data = List("john","paul","george","ringo")
val dataRDD = sc.makeRDD(data)
val scriptPath = getClass.getResource("test.py").getPath
val pipeRDD = dataRDD.pipe(command = "python3 " ++ scriptPath)
pipeRDD.foreach(println)
}
Output
hello john
hello ringo
hello george
hello paul
I have a few questions, please tell me.
Can I access the spark session in a python script?
Or can I create a file in my python script and save it to the hdfs file system?
Actually, that's what I'm trying to do
I want to create csv files in a python script, and save them to hdfs.
And one more minor question.
Is it possible to send commands to the workers to install python packages?
For example: pip install pandas
!UPD:
I have made some changes to the python file.
#!/usr/bin/python
# -*- coding: utf-8 -*-
#import pandas as pd
import sys
import os
for line in sys.stdin:
with open('readme.csv', 'w') as f:
f.write('Name,Last Name\nМихаил,Зубенко')
print(os.getcwd() + '/readme.csv')
The file is created inside the container:
Now I have one question. How do I access this file?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(1)
?不可以使用管道命令。
对于初学者,我建议您使用 PySpark 而不是 Scala,假设您“需要”Python。
或者,您也可以从 Scala 写入 HDFS。根本不清楚为什么需要 Python。 Scala 也可以创建/读取 CSV 文件。
是否可能,是的,但是,PySpark 已经在
spark-submit
和--py-files< 期间使用 ZIP/EGG 文件提供了对此的支持/代码> 参数
SparkSQL DataFrames 大部分取代了 Pandas 的需求,你应该使用它们而不是 RDD。您可以使用 Polars 项目在 Spark 原生数据帧和 pandas 之间进行转换
Not with a piped command, no.
For starters, I suggest you use PySpark rather than Scala, assuming you "need" Python.
Alternatively, you can also write to HDFS from Scala. Not really clear why you need Python at all. Scala can create/read CSV files as well.
Is it possible, yes, however, PySpark already provides support for this with the use of ZIP/EGG files during
spark-submit
and--py-files
argumentSparkSQL DataFrames mostly replace the need for Pandas, and you should use them rather than RDDs. You can use Polars project to convert between Spark native dataframes and pandas