如何使用Spark结构化流(Python)生成一个小时长的镶木式文件?

发布于 2025-01-18 05:14:49 字数 668 浏览 4 评论 0原文

我想每小时生成镶木地板文件,其中包含该小时内收到的所有信息,以便使用 Spark NLP 进行进一步处理。

我有来自 Kafka 的流数据,当我 writeStream 时,我将触发处理时间设置为一小时,它只会生成许多小 parquet 文件。我读过将合并设置为1加上触发器会生成一个大的镶木地板文件,但它仍然给我许多小的镶木地板文件。

我还读到,人们也可以设置最小行数,但我收到的行数每小时都会变化。

这就是我编写流的方式:

df.writeStream \
  .format("parquet") \
  .option("checkpointLocation", "s3a://datalake-twitter-app/spark_checkpoints/") \
  .option("path", "s3a://datalake-twitter-app/raw_datalake/") \
  .trigger(processingTime='60 minutes') \
  .start()

知道如何使用 Spark 结构化流编写一小时长的镶木地板文件,其中包含给定时间内收到的所有信息吗?也许我应该使用不同的东西?

我认为也可能最好专注于从其他 Spark 进程上的 S3 存储桶中读取文件,并读取前一小时内的文件。

I want to generate parquet files every hour with all the information received during that hour for further processing using Spark NLP.

I have streaming data coming from Kafka and when I writeStream I set the trigger processing time to one hour, it just generates many small parquet files. I've read that setting coalesce to 1 plus the trigger would generate a big parquet file but it still gives me many small parquet files.

I've also read that one can also set the minimum number of rows too, but the amount of rows I receive changes from hour to hour.

This is how I'm writing my stream:

df.writeStream \
  .format("parquet") \
  .option("checkpointLocation", "s3a://datalake-twitter-app/spark_checkpoints/") \
  .option("path", "s3a://datalake-twitter-app/raw_datalake/") \
  .trigger(processingTime='60 minutes') \
  .start()

Any idea on how I can write one hour long parquet files with all the information received during the given hour using Spark Structured Streaming? Maybe I should use something different?

I thought that it could also be that is better to focus on the reading files from the S3 bucket on the other Spark process, and read the files within the previous hour.

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文