使用Pyspark和HDFS创建新的CSV文件并上传数据

发布于 2025-01-21 17:10:20 字数 44 浏览 1 评论 0原文

我想使用pyspark在HDF中创建一个CSV文件,并在其中放置一些数据。

I want to create a csv file in hdfs using pyspark and put some data in it.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

花心好男孩 2025-01-28 17:10:20

您可以使用写入方法将pyspark数据帧写入HDFS作为CSV。在Spark 2.0+中,您可以直接使用CSV数据源。

from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = [
    {
        "row_nr": 1,
        "payment": "p1",
        "ref1": "c1",
        "ref2": "c1a",
        "original_ref": "c1",
    },
    {
        "row_nr": 2,
        "payment": "p1",
        "ref1": "c1a",
        "ref2": "c1b",
        "original_ref": None,
    }
]

schema = StructType(
    [
        StructField("row_nr", IntegerType()),
        StructField("payment", StringType()),
        StructField("ref1", StringType()),
        StructField("ref2", StringType()),
        StructField("original_ref", StringType()),
    ]
)

df = spark.createDataFrame(data=data, schema=schema)
df.write.csv(path="/my_path/my_file.csv", mode="overwrite")

You can write a pyspark DataFrame to hdfs as csv using the write method. From Spark 2.0+ you can use csv data source directly.

from pyspark.sql.types import IntegerType, StringType, StructField, StructType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data = [
    {
        "row_nr": 1,
        "payment": "p1",
        "ref1": "c1",
        "ref2": "c1a",
        "original_ref": "c1",
    },
    {
        "row_nr": 2,
        "payment": "p1",
        "ref1": "c1a",
        "ref2": "c1b",
        "original_ref": None,
    }
]

schema = StructType(
    [
        StructField("row_nr", IntegerType()),
        StructField("payment", StringType()),
        StructField("ref1", StringType()),
        StructField("ref2", StringType()),
        StructField("original_ref", StringType()),
    ]
)

df = spark.createDataFrame(data=data, schema=schema)
df.write.csv(path="/my_path/my_file.csv", mode="overwrite")
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文