如何连续几个相交?

发布于 2025-02-06 15:16:43 字数 674 浏览 1 评论 0原文

我有一个SQL查询,如下所示:

select col4, col5 from TableA where col1 = 'x'
intersect
select col4, col5 from TableA where col1 = 'y'
intersect
select col4, col5 from TableA where col1 = 'z'

如何将此SQL转换为Pyspark等效? 我可以创建3个df,然后进行以下相交:

df1 ==> select col4, col5 from TableA where col1 = 'x'
df2 ==> select col4, col5 from TableA where col1 = 'y'
df3 ==> select col4, col5 from TableA where col1 = 'z'

df_result = df1.intersect(df2)
df_result = df_result.intersect(df3)

但是,如果我有更多相交查询,我觉得这不是一个好方法。

另外,可以说[x,y,z]是动态的,意味着它可以像[x,y,z,a,b,.....]

有什么建议吗?

I have a SQL query like below:

select col4, col5 from TableA where col1 = 'x'
intersect
select col4, col5 from TableA where col1 = 'y'
intersect
select col4, col5 from TableA where col1 = 'z'

How can I convert this SQL to PySpark equivalent?
I can create 3 DF and then do intersect like below:

df1 ==> select col4, col5 from TableA where col1 = 'x'
df2 ==> select col4, col5 from TableA where col1 = 'y'
df3 ==> select col4, col5 from TableA where col1 = 'z'

df_result = df1.intersect(df2)
df_result = df_result.intersect(df3)

But I feel that's not good approach to follow if I had more intersect queries.

Also, let's say [x,y,z] is dynamic, means it can be like [x,y,z,a,b,.....]

Any suggestion?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

对你的占有欲 2025-02-13 15:16:43

如果您想执行几个连续的Intersect,则有降低
可用的。将所有DF放入一个列表中,您将连续相交:

from functools import reduce
dfs = [df1, df2,...]
df = reduce(lambda a, b: a.intersect(b), dfs)

但在您的情况下效率低下。


由于所有数据都来自相同的数据框,因此我建议进行返工。而不是使用Intersect重新加入DF,而是进行聚合和过滤。

脚本(Spark 3.1):

vals = ['x', 'y', 'z']
arr = F.array([F.lit(v) for v in vals])
df = df.groupBy('col4', 'col5').agg(F.collect_set('col1').alias('set'))
df = df.filter(F.forall(arr, lambda x: F.array_contains('set', x)))
df = df.drop('set')

测试:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 11, 'y'),
     (1, 11, 'y'),
     (1, 11, 'x'),
     (2, 22, 'x'),
     (1, 11, 'z'),
     (4, 44, 'z'),
     (1, 11, 'M')],
    ['col4', 'col5', 'col1'])

vals = ['x', 'y', 'z']
arr = F.array([F.lit(v) for v in vals])
df = df.groupBy('col4', 'col5').agg(F.collect_set('col1').alias('set'))
df = df.filter(F.forall(arr, lambda x: F.array_contains('set', x)))
df = df.drop('set')

df.show()
# +----+----+
# |col4|col5|
# +----+----+
# |   1|  11|
# +----+----+

If you wanted to do several consecutive intersect, there's reduce
available. Put all your dfs in one list and you will do intersect consecutively:

from functools import reduce
dfs = [df1, df2,...]
df = reduce(lambda a, b: a.intersect(b), dfs)

But it would be inefficient in your case.


Since all the data comes from the same dataframe, I would suggest a rework. Instead of dividing df and then rejoining using intersect, do an aggregation and filtering.

Script (Spark 3.1):

vals = ['x', 'y', 'z']
arr = F.array([F.lit(v) for v in vals])
df = df.groupBy('col4', 'col5').agg(F.collect_set('col1').alias('set'))
df = df.filter(F.forall(arr, lambda x: F.array_contains('set', x)))
df = df.drop('set')

Test:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 11, 'y'),
     (1, 11, 'y'),
     (1, 11, 'x'),
     (2, 22, 'x'),
     (1, 11, 'z'),
     (4, 44, 'z'),
     (1, 11, 'M')],
    ['col4', 'col5', 'col1'])

vals = ['x', 'y', 'z']
arr = F.array([F.lit(v) for v in vals])
df = df.groupBy('col4', 'col5').agg(F.collect_set('col1').alias('set'))
df = df.filter(F.forall(arr, lambda x: F.array_contains('set', x)))
df = df.drop('set')

df.show()
# +----+----+
# |col4|col5|
# +----+----+
# |   1|  11|
# +----+----+
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文