如何从Pyspark的另一个中减去流数据框?

发布于 2025-02-06 04:46:59 字数 1290 浏览 1 评论 0原文

我有以下代码摄入流数据:

stream = spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option('badRecordsPath',bad_records_path) \
  .schema(schema) \
  .load(landing_data_location)

display(stream)

并获得这样的数据框架:

+---+------+--------------------+
| id|amount|                  ts|
+---+------+--------------------+
|  1|    20|2022-06-01 13:58:...|
|  2|    20|2022-06-01 13:58:...|
|  3|    20|2022-06-01 13:58:...|
+---+------+--------------------+

然后,我使用sql语句验证数据:

stream.createOrReplaceTempView("events")
valid_records = spark.sql('select * from events e where e.id<>1')
display(valid_records)

结果将是这样的:

+---+------+--------------------+
| id|amount|                  ts|
+---+------+--------------------+
|  2|    20|2022-06-01 13:58:...|
|  3|    20|2022-06-01 13:58:...|
+---+------+--------------------+

然后,我尝试使用减法将其余数据在数据框中获取

invalid_records = stream.subtract(valid_records)
display(invalid_records)

:发生遵循错误:

AnalysisException: Except on a streaming DataFrame/Dataset on the right is not supported;

所以我的问题是:从另一个减去流数据框的正确方法是什么?

I have the following code to ingest streaming data:

stream = spark.readStream \
  .format("cloudFiles") \
  .option("cloudFiles.format", "csv") \
  .option("header", "true") \
  .option('badRecordsPath',bad_records_path) \
  .schema(schema) \
  .load(landing_data_location)

display(stream)

And get the dataframe like this:

+---+------+--------------------+
| id|amount|                  ts|
+---+------+--------------------+
|  1|    20|2022-06-01 13:58:...|
|  2|    20|2022-06-01 13:58:...|
|  3|    20|2022-06-01 13:58:...|
+---+------+--------------------+

Then I validate the data using a sql statement:

stream.createOrReplaceTempView("events")
valid_records = spark.sql('select * from events e where e.id<>1')
display(valid_records)

The result will be like this:

+---+------+--------------------+
| id|amount|                  ts|
+---+------+--------------------+
|  2|    20|2022-06-01 13:58:...|
|  3|    20|2022-06-01 13:58:...|
+---+------+--------------------+

Then I try to use subtract to get the rest data in the Dataframe:

invalid_records = stream.subtract(valid_records)
display(invalid_records)

The follow error occurs:

AnalysisException: Except on a streaming DataFrame/Dataset on the right is not supported;

So my question is: what's the correct way to subtract a streaming Dataframe from another one?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文