在子字符串匹配上加入 Pyspark Dataframes
我有两个数据帧 df1 和 df2 有点像这样:
import pandas as pd
from spark.sql import SparkSession
spark = SparkSession.builder.appName("someAppname").getOrCreate()
df1 = spark.createDataFrame(pd.DataFrame({"entity_nm": ["Joe B", "Donald", "Barack Obama"]}))
df2 = spark.createDataFrame(pd.DataFrame({"aliases": ["Joe Biden; Biden Joe", "Donald Trump; Donald J. Trump", "Barack Obama", "Joe Burrow"], "id": [1, 2, 3, 4]}))
我想基于包含匹配的字符串将 df2 连接到 df1 上,当我这样做时它确实有效:
df_joined = df1.join(df2, df2.aliases.contains(df1.entity_nm), how="left")
该连接给了我想要的结果:
+------------+--------------------+---+
| entity_nm| aliases| id|
+------------+--------------------+---+
| Joe B|Joe Biden; Biden Joe| 1|
| Joe B|Joe Burrow | 4|
| Donald|Donald Trump; Don...| 2|
|Barack Obama| Barack Obama| 3|
这里的问题:< /strong> 我尝试使用 df1 中的 60k 个实体名称列表和 df2 中的大约 600 万个别名来执行此操作,这种方法需要永远执行,直到在某个时候我的 Spark 会话会因崩溃而崩溃到内存错误。我很确定我的方法非常幼稚并且远未优化。
我读过 这篇 博客文章,建议使用 udf 但我不知道没有任何 Scala 知识,并且很难在 PySpark 中理解和重新创建它。
关于如何优化我的方法有什么建议或帮助吗?我需要经常做这样的任务,所以任何帮助将不胜感激。
I have two dataframes df1 and df2 somewhat like this:
import pandas as pd
from spark.sql import SparkSession
spark = SparkSession.builder.appName("someAppname").getOrCreate()
df1 = spark.createDataFrame(pd.DataFrame({"entity_nm": ["Joe B", "Donald", "Barack Obama"]}))
df2 = spark.createDataFrame(pd.DataFrame({"aliases": ["Joe Biden; Biden Joe", "Donald Trump; Donald J. Trump", "Barack Obama", "Joe Burrow"], "id": [1, 2, 3, 4]}))
I want to join df2 on df1 based on a string contains match, it does work when I do it like this:
df_joined = df1.join(df2, df2.aliases.contains(df1.entity_nm), how="left")
That join gives me my desired result:
+------------+--------------------+---+
| entity_nm| aliases| id|
+------------+--------------------+---+
| Joe B|Joe Biden; Biden Joe| 1|
| Joe B|Joe Burrow | 4|
| Donald|Donald Trump; Don...| 2|
|Barack Obama| Barack Obama| 3|
Problem here: I tried to do this with a list of 60k entity names in df1 and around 6 million aliases in df2 and this approach takes like forever until at some point my Spark session will just crash due to memory errors. I'm pretty sure that my approach is very naive and far from optimized.
I've read this blog post which suggests to use a udf but I don't have any Scala knowledge and struggle to understand and recreate it in PySpark.
Any suggestions or help on how to optimize my approach? I need to do tasks like this a lot, so any help would be greatly appreciated.
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论