如何使用 pyspark filestream 读取过去一小时内上传的新文件?

发布于 2025-01-12 18:58:46 字数 576 浏览 4 评论 0原文

我正在尝试读取目录中可用的最新文件(例如过去一小时内的新文件)并加载该数据。我正在尝试使用 pyspark 结构化流媒体。我尝试过 Spark Streaming 的 maxFileAge 选项,但无论选项中指定的时间如何,它仍然会加载目录中的所有文件。

spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
    .csv(upload_path) \
    .withColumn("closing_date", get_date_udf_func(input_file_name()))\
    .writeStream.format('parquet') \
    .trigger(once=True) \
    .option('checkpointLocation', checkpoint_path) \
    .option('path', write_path) \
    .start()

上面是我尝试过的代码,但无论时间如何,它都会加载所有可用文件。请指出我在这里做错了什么..

I am trying to read latest files(say new files in last one hour) available in a directory and load that data . I am trying with pyspark structured streaming. i have tried maxFileAge option of spark streaming, but still it is loading all the files in the diretory, regardless of time specified in the option.

spark.readStream\
.option("maxFileAge", "1h")\
.schema(cust_schema)\
    .csv(upload_path) \
    .withColumn("closing_date", get_date_udf_func(input_file_name()))\
    .writeStream.format('parquet') \
    .trigger(once=True) \
    .option('checkpointLocation', checkpoint_path) \
    .option('path', write_path) \
    .start()

Above is the code that i tried, but it will load all available files regardless of time . Please point out what i am doing wrong here ..

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

我的黑色迷你裙 2025-01-19 18:58:46

抱歉来得太晚了。

.option("maxFileAge", "1h") 仅在创建 checkpointLocation 之后才有效。

示例:

第一次运行,阅读所有内容。
第二次运行,检查 checkpointLocation 是否已创建,然后按 maxFileAge 过滤并将这些文件添加到 checkpointLocation 中。

您可以仅读取带有基本路径的 readstream 中的文件来创建单个检查点位置,然后 maxage 参数将在下一批中起作用。

spark.readStream\
.option("maxFileAge", "1h")\
.option("basePath", upload_path)\
.schema(cust_schema)\
    .csv(upload_path + "/partition1=xxxx/partition2=yyyy/") \
...

Sorry to being too late.

.option("maxFileAge", "1h") will only work if after the checkpointLocation was created.

Example:

Run first time, read everything.
Run second time, check if checkpointLocation is created then filter by maxFileAge and add these files to checkpointLocation.

You can read only a file in readstream with basepath to create a single checkpointLocation, then the maxage param will work in the next batches.

spark.readStream\
.option("maxFileAge", "1h")\
.option("basePath", upload_path)\
.schema(cust_schema)\
    .csv(upload_path + "/partition1=xxxx/partition2=yyyy/") \
...
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文