基于另一个数据框中的时间范围的过滤数据帧
我有两个pyspark数据范围。 df1
有10,000行和df2
有5亿行。我正在尝试根据DF1中的值在DF2上应用滤波器,一次迭代DF1,一次是一行。 例如:与DF1中的第一行相对应,我需要在DF2中过滤行,以使DF2中的时间戳字段应在begin_time_time
和end_time> end_time
以及filt df2 node_name
基于node_name
(array)的字段在DF1的第1行中。 重复DF1的所有行。
+-------------+-------------------+-------------------+----------------------------------------------------------------------------
| id |begin_time |end_time |node_name |
+-------------+-------------------+-------------------+---------------------------------------------------------------------------+
| 1182 |2021-02-01 00:01:00|2021-02-01 00:02:20|[x24n15, x1117, x1108, x1113, x11n02, x11n03, x11n11, x11n32] |
| 1183 |2021-02-01 00:02:40|2021-02-01 00:03:50|[x28n02, x1112, x1109, x1110] |
| 1184 |2021-02-01 00:04:10|2021-02-01 00:07:10|[x32n10, x34n13, x13n16, x32n09, x28n01] |
| 1185 |2021-02-01 00:05:00|2021-02-01 00:06:30|[x50n09, x50n08] |
| 1186 |2021-02-01 00:07:00|2021-02-01 00:08:20|[x50n08] |
有5亿行,约有100列,但为简单起见,显示了3列:
node_name | Sensor_val | 2021-02-01 |
---|---|---|
00:01:00 | x24n15 | 23.5 |
2021-02-02-02-02-01 00:01 00:01 | DataFrame DF2 | Timestamp |
00:01:00 01 00:01:00 | x24n16 | 23.5 |
2021-02-01 00:01:00 | x24n17 | 23.5 |
2021-02-01 00:01 00:00 | x24n18 | 23.5 |
2021-02-02-02-02-01 | x24nn19 | 00:01 |
00:01:00 :01:00 | x24n20 | 23.5 |
2021-02-01 00:01:01 | x24n15 | 23.5 |
2021-02-01 00:01 00:01 | x24n15 | 23.5 |
2021-02-02-02-01 00:01 00:01:01 x24nn16 | x24n16 x24n16 x24n16 | 23.5 |
2021-5 2021-021-02-01 00:00 :01 | X24N17 | 23.5 |
2021-02-01 00:01:01 | x24n18 | 23.5 |
2021-02-01 00:01 00:01 | x24n19 | 23.5 |
2021-02-02-01 | x24n20 | 23.5 |
需要的是:
最 | 001 00:01:01:01:01: 01 | 001 | ID |
---|---|---|---|
2021-02-01 00:01:00 | X24N15 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n15 | 23.5 | 1182 |
2021-02-02-01 | X24N16 | 001 | : 00 |
001 001:01 | X24N17 | 23.5 | 1183 |
2021-02-01 00:01:00 | 23.5 | 1183 | 2021-02-01 |
00:01 | 23.5 | 1184 | 2021-02-02-02-01-01 |
00:01 | x24n18 | 00:01:00 | x24n19 |
00:01:00 :01 | x24n15 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n15 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n16 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n17 | 23.5 | 1185 |
2021-02-01 00 :01:01 | X24N18 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n19 | 23.5 | 1185 |
1185 2021-02-01 | 00:01:01:01 x24n20 | 23.5 | 1185 |
更多行 | |||
到目前为止,我尝试过的是这样的事情,但是它非常慢,可能无法在生产中使用它。
SeriesAppend = []
data_collect= df1.rdd.toLocalIterator()
for row in data_collect:
bt = row.begin_time
et = row.end_time
temp_row_df = spark.sql("SELECT timestamp_t, node_name, sensor_val FROM df2_table WHERE timestamp_t >= '2021-02-01 00:00:00' AND timestamp_t < '2021-02-02 00:00:00' AND node_name IN row.node_name ")
temp_row_df = temp_row_df.withColumn("node_name", F.lit(row.allocation_id))
SeriesAppend.append(temp_row_df)
df_series = reduce(DataFrame.unionAll, SeriesAppend)
I have two PySpark dataframes. df1
having 10,000 rows and df2
having 500 million rows. I am trying to apply filter on df2 based on the values in df1 by iterating over df1, one row at a time.
For example: corresponding to the first row in df1, I need to filter rows in df2 such that the timestamp field in df2 should be between the begin_time
and end_time
of df1 and also filter df2 node_name
field based on node_name
(array) in row1 of df1.
Repeat this for all the rows of df1.
+-------------+-------------------+-------------------+----------------------------------------------------------------------------
| id |begin_time |end_time |node_name |
+-------------+-------------------+-------------------+---------------------------------------------------------------------------+
| 1182 |2021-02-01 00:01:00|2021-02-01 00:02:20|[x24n15, x1117, x1108, x1113, x11n02, x11n03, x11n11, x11n32] |
| 1183 |2021-02-01 00:02:40|2021-02-01 00:03:50|[x28n02, x1112, x1109, x1110] |
| 1184 |2021-02-01 00:04:10|2021-02-01 00:07:10|[x32n10, x34n13, x13n16, x32n09, x28n01] |
| 1185 |2021-02-01 00:05:00|2021-02-01 00:06:30|[x50n09, x50n08] |
| 1186 |2021-02-01 00:07:00|2021-02-01 00:08:20|[x50n08] |
The dataframe df2 has 500 million rows and have around 100 columns but for simplicity showing 3 columns:
timestamp | node_name | sensor_val |
---|---|---|
2021-02-01 00:01:00 | x24n15 | 23.5 |
2021-02-01 00:01:00 | x24n15 | 23.5 |
2021-02-01 00:01:00 | x24n16 | 23.5 |
2021-02-01 00:01:00 | x24n17 | 23.5 |
2021-02-01 00:01:00 | x24n18 | 23.5 |
2021-02-01 00:01:00 | x24n19 | 23.5 |
2021-02-01 00:01:00 | x24n20 | 23.5 |
2021-02-01 00:01:01 | x24n15 | 23.5 |
2021-02-01 00:01:01 | x24n15 | 23.5 |
2021-02-01 00:01:01 | x24n16 | 23.5 |
2021-02-01 00:01:01 | x24n17 | 23.5 |
2021-02-01 00:01:01 | x24n18 | 23.5 |
2021-02-01 00:01:01 | x24n19 | 23.5 |
2021-02-01 00:01:01 | x24n20 | 23.5 |
The resultant table that I need:
timestamp | nodes | sensor_val | id |
---|---|---|---|
2021-02-01 00:01:00 | x24n15 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n15 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n16 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n17 | 23.5 | 1183 |
2021-02-01 00:01:00 | x24n18 | 23.5 | 1183 |
2021-02-01 00:01:00 | x24n19 | 23.5 | 1184 |
2021-02-01 00:01:00 | x24n20 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n15 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n15 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n16 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n17 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n18 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n19 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n20 | 23.5 | 1185 |
many more rows (order of 100s million) | |||
What I have tried so far is something like this, but it is very slow and may not be possible to use it in the production.
SeriesAppend = []
data_collect= df1.rdd.toLocalIterator()
for row in data_collect:
bt = row.begin_time
et = row.end_time
temp_row_df = spark.sql("SELECT timestamp_t, node_name, sensor_val FROM df2_table WHERE timestamp_t >= '2021-02-01 00:00:00' AND timestamp_t < '2021-02-02 00:00:00' AND node_name IN row.node_name ")
temp_row_df = temp_row_df.withColumn("node_name", F.lit(row.allocation_id))
SeriesAppend.append(temp_row_df)
df_series = reduce(DataFrame.unionAll, SeriesAppend)
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
选项1 - 可能像您一样在大桌子上效率低下,但较小的效果:
选项2
Option 1 - probably inefficient on big tables as yours, but good on smaller:
Option 2
您应该准备数据框架,以便可以在两个数据范围内执行加入。
在DF1中创建一个新列“ Time_gap”,该列将具有[BEGIN_TIME -END_TIME]值。
将“ time_gap”值(即[begin_time -end_time])的所有可能组合收集到数组中。
在df1中,在“ node_name2”中爆炸“ node_name”列,以便每行都对应于一个node_name2。
在DF2中,创建一个称为“ time_gaps”的新列,并找到适合每一行的时间戳值的适当time_gap值。
在df2中,将“ time_gaps”爆炸到列中say“ time_gap”,因此每行都对应于一个time_gap。
您现在可以在[df1.node_name2 == df2.node_name,df1.time_gap == df2.time_gap]上加入DF1 X DF2,然后执行您所需的任务。
>
You should prep up dataframes so that you can perform a join on the two dataframes.
create a new column in df1 say "time_gap" which will have [begin_time - end_time] value.
collect all possible combinations of "time_gap" values ( i.e [begin_time - end_time]) in to an array.
in df1, explode "node_name" column in to a "node_name2" so that each row will correspond to one node_name2.
in df2, create a new column called "time_gaps" and find the appropriate time_gap value(s) that the timestamp value on each row will fit into.
in df2, explode the "time_gaps" in to a column say "time_gap", so each row will correspond to one time_gap.
you can now join df1 X df2 on [df1.node_name2 == df2.node_name , df1.time_gap == df2.time_gap] and then perform your required tasks.