如何修复socket.timeout:超时错误
我有一个包含大约 900 万个地址的大型数据帧。我试图通过标准化字典中的常见拼写错误/速记来清理地址,我编写了以下函数来执行此任务:
import re
from pyspark.sql.functions import udf
def address_replace(address):
class Xlator(dict):
""" All-in-one multiple-string-substitution class """
def _make_regex(self):
""" Build re object based on the keys of the current dictionary """
return re.compile(r'\b'+r'\b|\b'.join(map(re.escape, self.keys( )))+r'\b')
def __call__(self, match):
""" Handler invoked for each regex match """
return self[match.group(0)]
def xlat(self, text):
""" Translate text, returns the modified text. """
return self._make_regex( ).sub(self, text)
if (address == '') or (address == None):
result = ''
else:
xlat = Xlator(error_dict)
result = xlat.xlat(address)
return result
address_replaceUDF = udf(address_replace,StringType())
我已经用一行简单的文本测试了该函数(address_replace),它生成所需的改变。
然后,我使用 udf 在数据框中创建一个新列,如下所示(首先删除空白和空地址):
df_add = df_address.select('ID','RESIDENTIAL_ADDRESS').where(~
(col('RESIDENTIAL_ADDRESS')==''))\
.where(col('RESIDENTIAL_ADDRESS').isNotNull())
print(df_add.count())
df_add =
df_add.withColumn('CLEAN_RESIDENTIAL',address_replaceUDF(col('RESIDENTIAL_ADDRESS')))
df_add.select('RESIDENTIAL_ADDRESS','CLEAN_RESIDENTIAL')\
.where('NOT RESIDENTIAL_ADDRESS = CLEAN_RESIDENTIAL').show(100,truncate=False)
然后我想检查对数据进行更改的结果。如果我在没有最终 where 子句的情况下运行上述代码,它会生成结果,但使用 where 子句时,我会收到以下错误:
如果您遵循错误文件路径,则会出现以下情况:
<一个href="https://i.sstatic.net/jHeIh.png" rel="nofollow noreferrer">
我尝试使用以下代码增加可用的内存和缓冲区大小:
spark = SparkSession.builder.master('local[4]')\
.config("spark.executor.memory","30g").config("spark.shuffle.file.buffer", "64k")\
.config("spark.eventLog.buffer.kb", "200k").getOrCreate()
但这并没有解决错误,这是一个重要的项目,如果可以的话请帮助我,谢谢
I have a large dataframe of about 9million addresses. I am trying to clean the addresses by standardizing common spelling mistakes/short hand that I have in a dictionary, I have written the following function to perform this task:
import re
from pyspark.sql.functions import udf
def address_replace(address):
class Xlator(dict):
""" All-in-one multiple-string-substitution class """
def _make_regex(self):
""" Build re object based on the keys of the current dictionary """
return re.compile(r'\b'+r'\b|\b'.join(map(re.escape, self.keys( )))+r'\b')
def __call__(self, match):
""" Handler invoked for each regex match """
return self[match.group(0)]
def xlat(self, text):
""" Translate text, returns the modified text. """
return self._make_regex( ).sub(self, text)
if (address == '') or (address == None):
result = ''
else:
xlat = Xlator(error_dict)
result = xlat.xlat(address)
return result
address_replaceUDF = udf(address_replace,StringType())
I have tested the function (address_replace) with a simple line of text and it produces the desired changes.
Then I use my udf to create a new column in my dataframe as follows (first removing blank and null addresses):
df_add = df_address.select('ID','RESIDENTIAL_ADDRESS').where(~
(col('RESIDENTIAL_ADDRESS')==''))\
.where(col('RESIDENTIAL_ADDRESS').isNotNull())
print(df_add.count())
df_add =
df_add.withColumn('CLEAN_RESIDENTIAL',address_replaceUDF(col('RESIDENTIAL_ADDRESS')))
df_add.select('RESIDENTIAL_ADDRESS','CLEAN_RESIDENTIAL')\
.where('NOT RESIDENTIAL_ADDRESS = CLEAN_RESIDENTIAL').show(100,truncate=False)
I then want to inspect the results where changes have been made to the data. If I run the above code without the final where clause it produces a result, but with the where clause, I get the following error:
Which if you follow the error file path leads you to this:
I have tried to increase the memory and buffer size available with the following code:
spark = SparkSession.builder.master('local[4]')\
.config("spark.executor.memory","30g").config("spark.shuffle.file.buffer", "64k")\
.config("spark.eventLog.buffer.kb", "200k").getOrCreate()
But this did no resolve the error, this is an important project, please help me if you can, thank
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论