基于另一个数据框中的时间范围的过滤数据帧

发布于 2025-02-06 08:21:03 字数 8020 浏览 1 评论 0原文

我有两个pyspark数据范围。 df1有10,000行和df2有5亿行。我正在尝试根据DF1中的值在DF2上应用滤波器,一次迭代DF1,一次是一行。 例如:与DF1中的第一行相对应,我需要在DF2中过滤行,以使DF2中的时间戳字段应在begin_time_timeend_time> end_time以及filt df2 node_name基于node_name(array)的字段在DF1的第1行中。 重复DF1的所有行。

+-------------+-------------------+-------------------+----------------------------------------------------------------------------
|      id     |begin_time         |end_time           |node_name                                                                  |
+-------------+-------------------+-------------------+---------------------------------------------------------------------------+
|   1182      |2021-02-01 00:01:00|2021-02-01 00:02:20|[x24n15, x1117, x1108, x1113, x11n02, x11n03, x11n11, x11n32]              |
|   1183      |2021-02-01 00:02:40|2021-02-01 00:03:50|[x28n02, x1112, x1109, x1110]                                              |
|   1184      |2021-02-01 00:04:10|2021-02-01 00:07:10|[x32n10, x34n13, x13n16, x32n09, x28n01]                                   |
|   1185      |2021-02-01 00:05:00|2021-02-01 00:06:30|[x50n09, x50n08]                                                           |
|   1186      |2021-02-01 00:07:00|2021-02-01 00:08:20|[x50n08]                                                                   |

有5亿行,约有100列,但为简单起见,显示了3列:

node_nameSensor_val2021-02-01
00:01:00x24n1523.5
2021-02-02-02-02-01 00:01 00:01DataFrame DF2Timestamp
00:01:00 01 00:01:00x24n1623.5
2021-02-01 00:01:00x24n1723.5
2021-02-01 00:01 00:00x24n1823.5
2021-02-02-02-02-01x24nn1900:01
00:01:00 :01:00x24n2023.5
2021-02-01 00:01:01x24n1523.5
2021-02-01 00:01 00:01x24n1523.5
2021-02-02-02-01 00:01 00:01:01 x24nn16x24n16 x24n16 x24n1623.5
2021-5 2021-021-02-01 00:00 :01X24N1723.5
2021-02-01 00:01:01x24n1823.5
2021-02-01 00:01 00:01x24n1923.5
2021-02-02-01x24n2023.5

需要的是:

001 00:01:01:01:01: 01001ID
2021-02-01 00:01:00X24N1523.51182
2021-02-01 00:01:00x24n1523.51182
2021-02-02-01X24N16001: 00
001 001:01X24N1723.51183
2021-02-01 00:01:0023.511832021-02-01
00:0123.511842021-02-02-02-01-01
00:01x24n1800:01:00x24n19
00:01:00 :01x24n1523.51184
2021-02-01 00:01:01x24n1523.51184
2021-02-01 00:01:01x24n1623.51185
2021-02-01 00:01:01x24n1723.51185
2021-02-01 00 :01:01X24N1823.51185
2021-02-01 00:01:01x24n1923.51185
1185 2021-02-0100:01:01:01 x24n2023.51185
更多行

到目前为止,我尝试过的是这样的事情,但是它非常慢,可能无法在生产中使用它。

SeriesAppend = []
data_collect= df1.rdd.toLocalIterator()
for row in data_collect:
    bt = row.begin_time
    et = row.end_time 
    temp_row_df = spark.sql("SELECT timestamp_t, node_name, sensor_val FROM df2_table WHERE timestamp_t >= '2021-02-01 00:00:00' AND timestamp_t < '2021-02-02 00:00:00' AND node_name IN row.node_name ") 
    temp_row_df = temp_row_df.withColumn("node_name", F.lit(row.allocation_id))
    SeriesAppend.append(temp_row_df)
df_series = reduce(DataFrame.unionAll, SeriesAppend)

I have two PySpark dataframes. df1 having 10,000 rows and df2 having 500 million rows. I am trying to apply filter on df2 based on the values in df1 by iterating over df1, one row at a time.
For example: corresponding to the first row in df1, I need to filter rows in df2 such that the timestamp field in df2 should be between the begin_time and end_time of df1 and also filter df2 node_name field based on node_name (array) in row1 of df1.
Repeat this for all the rows of df1.

+-------------+-------------------+-------------------+----------------------------------------------------------------------------
|      id     |begin_time         |end_time           |node_name                                                                  |
+-------------+-------------------+-------------------+---------------------------------------------------------------------------+
|   1182      |2021-02-01 00:01:00|2021-02-01 00:02:20|[x24n15, x1117, x1108, x1113, x11n02, x11n03, x11n11, x11n32]              |
|   1183      |2021-02-01 00:02:40|2021-02-01 00:03:50|[x28n02, x1112, x1109, x1110]                                              |
|   1184      |2021-02-01 00:04:10|2021-02-01 00:07:10|[x32n10, x34n13, x13n16, x32n09, x28n01]                                   |
|   1185      |2021-02-01 00:05:00|2021-02-01 00:06:30|[x50n09, x50n08]                                                           |
|   1186      |2021-02-01 00:07:00|2021-02-01 00:08:20|[x50n08]                                                                   |

The dataframe df2 has 500 million rows and have around 100 columns but for simplicity showing 3 columns:

timestampnode_namesensor_val
2021-02-01 00:01:00x24n1523.5
2021-02-01 00:01:00x24n1523.5
2021-02-01 00:01:00x24n1623.5
2021-02-01 00:01:00x24n1723.5
2021-02-01 00:01:00x24n1823.5
2021-02-01 00:01:00x24n1923.5
2021-02-01 00:01:00x24n2023.5
2021-02-01 00:01:01x24n1523.5
2021-02-01 00:01:01x24n1523.5
2021-02-01 00:01:01x24n1623.5
2021-02-01 00:01:01x24n1723.5
2021-02-01 00:01:01x24n1823.5
2021-02-01 00:01:01x24n1923.5
2021-02-01 00:01:01x24n2023.5

The resultant table that I need:

timestampnodessensor_valid
2021-02-01 00:01:00x24n1523.51182
2021-02-01 00:01:00x24n1523.51182
2021-02-01 00:01:00x24n1623.51182
2021-02-01 00:01:00x24n1723.51183
2021-02-01 00:01:00x24n1823.51183
2021-02-01 00:01:00x24n1923.51184
2021-02-01 00:01:00x24n2023.51184
2021-02-01 00:01:01x24n1523.51184
2021-02-01 00:01:01x24n1523.51184
2021-02-01 00:01:01x24n1623.51185
2021-02-01 00:01:01x24n1723.51185
2021-02-01 00:01:01x24n1823.51185
2021-02-01 00:01:01x24n1923.51185
2021-02-01 00:01:01x24n2023.51185
many more rows (order of 100s million)

What I have tried so far is something like this, but it is very slow and may not be possible to use it in the production.

SeriesAppend = []
data_collect= df1.rdd.toLocalIterator()
for row in data_collect:
    bt = row.begin_time
    et = row.end_time 
    temp_row_df = spark.sql("SELECT timestamp_t, node_name, sensor_val FROM df2_table WHERE timestamp_t >= '2021-02-01 00:00:00' AND timestamp_t < '2021-02-02 00:00:00' AND node_name IN row.node_name ") 
    temp_row_df = temp_row_df.withColumn("node_name", F.lit(row.allocation_id))
    SeriesAppend.append(temp_row_df)
df_series = reduce(DataFrame.unionAll, SeriesAppend)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

失而复得 2025-02-13 08:21:03

选项1 - 可能像您一样在大桌子上效率低下,但较小的效果:

df1_2 = df1.withColumn('node_name', F.explode('node_name'))
df = df2.join(df1_2, [df1_2.node_name == df2.node_name, df1_2.begin_time <= df2.timestamp, df2.timestamp <= df1_2.end_time], 'left')
df = df.select(df2['*'], 'id')

选项2

df1_2 = (df1
    .withColumn('node_name', F.explode('node_name'))
    .withColumn('unix_time', F.explode(F.sequence(F.unix_timestamp('begin_time'), F.unix_timestamp('end_time'))))
)
df2_2 = df2.withColumn('unix_time', F.unix_timestamp('timestamp'))
df = df2_2.join(df1_2, ['node_name', 'unix_time'], 'left')
df = df.select(df2['*'], 'id')

Option 1 - probably inefficient on big tables as yours, but good on smaller:

df1_2 = df1.withColumn('node_name', F.explode('node_name'))
df = df2.join(df1_2, [df1_2.node_name == df2.node_name, df1_2.begin_time <= df2.timestamp, df2.timestamp <= df1_2.end_time], 'left')
df = df.select(df2['*'], 'id')

Option 2

df1_2 = (df1
    .withColumn('node_name', F.explode('node_name'))
    .withColumn('unix_time', F.explode(F.sequence(F.unix_timestamp('begin_time'), F.unix_timestamp('end_time'))))
)
df2_2 = df2.withColumn('unix_time', F.unix_timestamp('timestamp'))
df = df2_2.join(df1_2, ['node_name', 'unix_time'], 'left')
df = df.select(df2['*'], 'id')
離人涙 2025-02-13 08:21:03

您应该准备数据框架,以便可以在两个数据范围内执行加入。

  1. 在DF1中创建一个新列“ Time_gap”,该列将具有[BEGIN_TIME -END_TIME]值。

  2. 将“ time_gap”值(即[begin_time -end_time])的所有可能组合收集到数组中。

  3. 在df1中,在“ node_name2”中爆炸“ node_name”列,以便每行都对应于一个node_name2。

  4. 在DF2中,创建一个称为“ time_gaps”的新列,并找到适合每一行的时间戳值的适当time_gap值。

  5. 在df2中,将“ time_gaps”爆炸到列中say“ time_gap”,因此每行都对应于一个time_gap。

  6. 您现在可以在[df1.node_name2 == df2.node_name,df1.time_gap == df2.time_gap]上加入DF1 X DF2,然后执行您所需的任务。

    >

You should prep up dataframes so that you can perform a join on the two dataframes.

  1. create a new column in df1 say "time_gap" which will have [begin_time - end_time] value.

  2. collect all possible combinations of "time_gap" values ( i.e [begin_time - end_time]) in to an array.

  3. in df1, explode "node_name" column in to a "node_name2" so that each row will correspond to one node_name2.

  4. in df2, create a new column called "time_gaps" and find the appropriate time_gap value(s) that the timestamp value on each row will fit into.

  5. in df2, explode the "time_gaps" in to a column say "time_gap", so each row will correspond to one time_gap.

  6. you can now join df1 X df2 on [df1.node_name2 == df2.node_name , df1.time_gap == df2.time_gap] and then perform your required tasks.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文