胶水/火花:过滤一个具有数千条条件的大型动态框架

发布于 2025-02-13 18:17:43 字数 1059 浏览 1 评论 0原文

我正在尝试用数百万行具有数据的数百万行滤波胶胶框架:

id  val ts  
a   1.3 2022-05-03T14:18:00.000Z
a   9.2 2022-05-03T12:18:00.000Z
c   8.2 2022-05-03T13:48:00.000Z

我还有另一个带有数千行的pandas数据框架:

id  start_ts                        end_ts  
a   2022-05-03T14:00:00.000Z    2022-05-03T14:18:00.000Z
a   2022-05-03T11:38:00.000Z    2022-05-03T12:18:00.000Z
c   2022-05-03T13:15:00.000Z    2022-05-03T13:48:00.000Z

我想在时间序列的动态框架中过滤所有行中的所有行,条件它们具有相同的> idts位于start_tsend_ts之间。

我当前的方法太慢了,无法解决问题:

我首先在pandas_df上迭代,并将多个过滤的胶水动态帧存储到一个数组中

dfs=[]
for index, row in pandas_df.iterrows():
    df = Filter.apply(ts_dynamicframe, f=lambda x: ((row['start_ts'] <= x['ts'] <= row['end_ts']) and x['id'] == index))
    dfs.append(df)

,然后将所有动态框架结合在一起。

df = dfs[0]

dfs.pop(0)

for _df in dfs: 
    df = df.union(_df)

物业化需要太长而永远不会完成

print("Count: ", df.count())

I am trying to filter a timeseries glue dynamic frame with millions of rows having data:

id  val ts  
a   1.3 2022-05-03T14:18:00.000Z
a   9.2 2022-05-03T12:18:00.000Z
c   8.2 2022-05-03T13:48:00.000Z

I have another pandas dataframe with thousands of rows:

id  start_ts                        end_ts  
a   2022-05-03T14:00:00.000Z    2022-05-03T14:18:00.000Z
a   2022-05-03T11:38:00.000Z    2022-05-03T12:18:00.000Z
c   2022-05-03T13:15:00.000Z    2022-05-03T13:48:00.000Z

I want to filter all the rows in the time series dynamic frame having condition they have the same id and the ts is between start_ts and end_ts.

My current approach is too slow to solve the problem:

I am first iterating over the pandas_df and storing multiple filtered glue dynamic frames into an array

dfs=[]
for index, row in pandas_df.iterrows():
    df = Filter.apply(ts_dynamicframe, f=lambda x: ((row['start_ts'] <= x['ts'] <= row['end_ts']) and x['id'] == index))
    dfs.append(df)

and then unioning all the dynamicframes together.

df = dfs[0]

dfs.pop(0)

for _df in dfs: 
    df = df.union(_df)

the materialization takes too long and never finishes..

print("Count: ", df.count())

What could be more efficient approaches to solving this problem with spark/glue?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

从﹋此江山别 2025-02-20 18:17:44

使用范围加入

数据

df=spark.createDataFrame([('a' ,  1.3 ,'2022-05-03T14:18:00.000Z'),
    ('a' ,  9.2, '2021-05-03T12:18:00.000Z'),
    ('c' ,  8.2, '2022-05-03T13:48:00.000Z')],
    ('id' , 'val', 'ts'  ))
    
    
    
     
    df1=spark.createDataFrame([('a' ,  '2022-05-03T14:00:00.000Z'  ,  '2022-05-03T14:18:00.000Z'),
    ('a'  , '2022-05-03T11:38:00.000Z' , '2022-05-03T12:18:00.000Z'),
    ('c' ,  '2022-05-03T13:15:00.000Z' ,  '2022-05-03T13:48:00.000Z')],
    ('id' , 'start_ts' , 'end_ts' ))
    
    #Convert to timestamp if not yet converted
    df= df.withColumn('ts', to_timestamp('ts'))
    df1= df1.withColumn('start_ts', to_timestamp('start_ts')).withColumn('end_ts', to_timestamp('end_ts'))

解决

#convert to SQL table
df1.createOrReplaceTempView('df1')
df.createOrReplaceTempView('df')


#Use range between
spark.sql("SELECT * FROM df,df1 WHERE df.id= df1.id AND df.ts BETWEEN df1.start_ts and df1.end_ts").show()

方案结果

+---+---+-------------------+---+-------------------+-------------------+
| id|val|                 ts| id|           start_ts|             end_ts|
+---+---+-------------------+---+-------------------+-------------------+
|  a|1.3|2022-05-03 14:18:00|  a|2022-05-03 14:00:00|2022-05-03 14:18:00|
|  c|8.2|2022-05-03 13:48:00|  c|2022-05-03 13:15:00|2022-05-03 13:48:00|
+---+---+-------------------+---+-------------------+-------------------+

Use a range join

Data

df=spark.createDataFrame([('a' ,  1.3 ,'2022-05-03T14:18:00.000Z'),
    ('a' ,  9.2, '2021-05-03T12:18:00.000Z'),
    ('c' ,  8.2, '2022-05-03T13:48:00.000Z')],
    ('id' , 'val', 'ts'  ))
    
    
    
     
    df1=spark.createDataFrame([('a' ,  '2022-05-03T14:00:00.000Z'  ,  '2022-05-03T14:18:00.000Z'),
    ('a'  , '2022-05-03T11:38:00.000Z' , '2022-05-03T12:18:00.000Z'),
    ('c' ,  '2022-05-03T13:15:00.000Z' ,  '2022-05-03T13:48:00.000Z')],
    ('id' , 'start_ts' , 'end_ts' ))
    
    #Convert to timestamp if not yet converted
    df= df.withColumn('ts', to_timestamp('ts'))
    df1= df1.withColumn('start_ts', to_timestamp('start_ts')).withColumn('end_ts', to_timestamp('end_ts'))

Solution

#convert to SQL table
df1.createOrReplaceTempView('df1')
df.createOrReplaceTempView('df')


#Use range between
spark.sql("SELECT * FROM df,df1 WHERE df.id= df1.id AND df.ts BETWEEN df1.start_ts and df1.end_ts").show()

outcome

+---+---+-------------------+---+-------------------+-------------------+
| id|val|                 ts| id|           start_ts|             end_ts|
+---+---+-------------------+---+-------------------+-------------------+
|  a|1.3|2022-05-03 14:18:00|  a|2022-05-03 14:00:00|2022-05-03 14:18:00|
|  c|8.2|2022-05-03 13:48:00|  c|2022-05-03 13:15:00|2022-05-03 13:48:00|
+---+---+-------------------+---+-------------------+-------------------+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文