如何从Pyspark的另一个中减去流数据框?
我有以下代码摄入流数据:
stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option('badRecordsPath',bad_records_path) \
.schema(schema) \
.load(landing_data_location)
display(stream)
并获得这样的数据框架:
+---+------+--------------------+
| id|amount| ts|
+---+------+--------------------+
| 1| 20|2022-06-01 13:58:...|
| 2| 20|2022-06-01 13:58:...|
| 3| 20|2022-06-01 13:58:...|
+---+------+--------------------+
然后,我使用sql语句验证数据:
stream.createOrReplaceTempView("events")
valid_records = spark.sql('select * from events e where e.id<>1')
display(valid_records)
结果将是这样的:
+---+------+--------------------+
| id|amount| ts|
+---+------+--------------------+
| 2| 20|2022-06-01 13:58:...|
| 3| 20|2022-06-01 13:58:...|
+---+------+--------------------+
然后,我尝试使用减法将其余数据在数据框中获取
invalid_records = stream.subtract(valid_records)
display(invalid_records)
:发生遵循错误:
AnalysisException: Except on a streaming DataFrame/Dataset on the right is not supported;
所以我的问题是:从另一个减去流数据框的正确方法是什么?
I have the following code to ingest streaming data:
stream = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("header", "true") \
.option('badRecordsPath',bad_records_path) \
.schema(schema) \
.load(landing_data_location)
display(stream)
And get the dataframe like this:
+---+------+--------------------+
| id|amount| ts|
+---+------+--------------------+
| 1| 20|2022-06-01 13:58:...|
| 2| 20|2022-06-01 13:58:...|
| 3| 20|2022-06-01 13:58:...|
+---+------+--------------------+
Then I validate the data using a sql statement:
stream.createOrReplaceTempView("events")
valid_records = spark.sql('select * from events e where e.id<>1')
display(valid_records)
The result will be like this:
+---+------+--------------------+
| id|amount| ts|
+---+------+--------------------+
| 2| 20|2022-06-01 13:58:...|
| 3| 20|2022-06-01 13:58:...|
+---+------+--------------------+
Then I try to use subtract to get the rest data in the Dataframe:
invalid_records = stream.subtract(valid_records)
display(invalid_records)
The follow error occurs:
AnalysisException: Except on a streaming DataFrame/Dataset on the right is not supported;
So my question is: what's the correct way to subtract a streaming Dataframe from another one?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
data:image/s3,"s3://crabby-images/d5906/d59060df4059a6cc364216c4d63ceec29ef7fe66" alt="扫码二维码加入Web技术交流群"
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论