在集群中的执行器上运行 python 脚本 [Scala/Spark]

发布于 2025-01-09 04:43:10 字数 1243 浏览 1 评论 0原文

我有 python 脚本:

import sys

for line in sys.stdin:
  print("hello " + line)

我在集群中的工作人员上运行它:

def run(spark: SparkSession) = {

  val data = List("john","paul","george","ringo")

  val dataRDD = sc.makeRDD(data)
  val scriptPath = getClass.getResource("test.py").getPath
  val pipeRDD = dataRDD.pipe(command = "python3 " ++ scriptPath)

  pipeRDD.foreach(println)
}

输出

hello john

hello Ringo

hello george

hello paul

我有几个问题,请告诉我。 我可以在 python 脚本中访问 Spark 会话吗? 或者我可以在 python 脚本中创建一个文件并将其保存到 hdfs 文件系统吗?

事实上,这就是我想做的 我想在 python 脚本中创建 csv 文件,并将它们保存到 hdfs。

还有一个小问题。 是否可以向工作人员发送命令来安装 python 软件包?

例如:pip install pandas

!UPD: 我对 python 文件做了一些更改。

#!/usr/bin/python
# -*- coding: utf-8 -*-

#import pandas as pd
import sys
import os

for line in sys.stdin:
    with open('readme.csv', 'w') as f:
        f.write('Name,Last Name\nМихаил,Зубенко')

print(os.getcwd() + '/readme.csv')

该文件是在容器内创建的:

在此处输入图像描述

现在我有一个问题。我如何访问该文件?

I have python script:

import sys

for line in sys.stdin:
  print("hello " + line)

And I run it on workers in cluster:

def run(spark: SparkSession) = {

  val data = List("john","paul","george","ringo")

  val dataRDD = sc.makeRDD(data)
  val scriptPath = getClass.getResource("test.py").getPath
  val pipeRDD = dataRDD.pipe(command = "python3 " ++ scriptPath)

  pipeRDD.foreach(println)
}

Output

hello john

hello ringo

hello george

hello paul

I have a few questions, please tell me.
Can I access the spark session in a python script?
Or can I create a file in my python script and save it to the hdfs file system?

Actually, that's what I'm trying to do
I want to create csv files in a python script, and save them to hdfs.

And one more minor question.
Is it possible to send commands to the workers to install python packages?

For example: pip install pandas

!UPD:
I have made some changes to the python file.

#!/usr/bin/python
# -*- coding: utf-8 -*-

#import pandas as pd
import sys
import os

for line in sys.stdin:
    with open('readme.csv', 'w') as f:
        f.write('Name,Last Name\nМихаил,Зубенко')

print(os.getcwd() + '/readme.csv')

The file is created inside the container:

enter image description here

Now I have one question. How do I access this file?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

飘逸的'云 2025-01-16 04:43:10

我可以在 python 脚本中访问 Spark 会话

?不可以使用管道命令。

Python 脚本中的文件

对于初学者,我建议您使用 PySpark 而不是 Scala,假设您“需要”Python。

rdd = sparkContext.parallelize(["john","paul","george","ringo"])
hello = rdd.mapValues(lambda s: "hello " + s)
for s in rdd.collect():
  print(s)

创建一个文件...并将其保存到hdfs文件系统...

或者,您也可以从 Scala 写入 HDFS。根本不清楚为什么需要 Python。 Scala 也可以创建/读取 CSV 文件。

[在worker上]安装python包

是否可能,是的,但是,PySpark 已经在 spark-submit--py-files< 期间使用 ZIP/EGG 文件提供了对此的支持/代码> 参数

例如,熊猫

SparkSQL DataFrames 大部分取代了 Pandas 的需求,你应该使用它们而不是 RDD。您可以使用 Polars 项目在 Spark 原生数据帧和 pandas 之间进行转换

Can I access the spark session in a python script

Not with a piped command, no.

files in Python script

For starters, I suggest you use PySpark rather than Scala, assuming you "need" Python.

rdd = sparkContext.parallelize(["john","paul","george","ringo"])
hello = rdd.mapValues(lambda s: "hello " + s)
for s in rdd.collect():
  print(s)

create a file ... and save it to the hdfs file system ...

Alternatively, you can also write to HDFS from Scala. Not really clear why you need Python at all. Scala can create/read CSV files as well.

install python packages [on workers]

Is it possible, yes, however, PySpark already provides support for this with the use of ZIP/EGG files during spark-submit and --py-files argument

for example, pandas

SparkSQL DataFrames mostly replace the need for Pandas, and you should use them rather than RDDs. You can use Polars project to convert between Spark native dataframes and pandas

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文