如何修复socket.timeout:超时错误

发布于 2025-01-19 18:46:09 字数 2315 浏览 0 评论 0原文

我有一个包含大约 900 万个地址的大型数据帧。我试图通过标准化字典中的常见拼写错误/速记来清理地址,我编写了以下函数来执行此任务:

import re
from pyspark.sql.functions import udf

def address_replace(address):
    class Xlator(dict):
        """ All-in-one multiple-string-substitution class """
        def _make_regex(self):
            """ Build re object based on the keys of the current dictionary """
            return re.compile(r'\b'+r'\b|\b'.join(map(re.escape, self.keys(  )))+r'\b')

        def __call__(self, match):
            """ Handler invoked for each regex match """
            return self[match.group(0)]

        def xlat(self, text):
            """ Translate text, returns the modified text. """
            return self._make_regex(  ).sub(self, text)

    if (address == '') or (address == None):
        result = ''
    else:
        xlat = Xlator(error_dict)
        result = xlat.xlat(address)

    return result

address_replaceUDF = udf(address_replace,StringType())

我已经用一行简单的文本测试了该函数(address_replace),它生成所需的改变。

然后,我使用 udf 在数据框中创建一个新列,如下所示(首先删除空白和空地址):

df_add = df_address.select('ID','RESIDENTIAL_ADDRESS').where(~ 
                                                  (col('RESIDENTIAL_ADDRESS')==''))\
                                          .where(col('RESIDENTIAL_ADDRESS').isNotNull())
print(df_add.count())

df_add = 
df_add.withColumn('CLEAN_RESIDENTIAL',address_replaceUDF(col('RESIDENTIAL_ADDRESS')))

df_add.select('RESIDENTIAL_ADDRESS','CLEAN_RESIDENTIAL')\
      .where('NOT RESIDENTIAL_ADDRESS = CLEAN_RESIDENTIAL').show(100,truncate=False)

然后我想检查对数据进行更改的结果。如果我在没有最终 where 子句的情况下运行上述代码,它会生成结果,但使用 where 子句时,我会收到以下错误:

Socket Timeout Error Message

如果您遵循错误文件路径,则会出现以下情况:

<一个href="https://i.sstatic.net/jHeIh.png" rel="nofollow noreferrer">文件中的第 707 行socket.py

我尝试使用以下代码增加可用的内存和缓冲区大小:

spark = SparkSession.builder.master('local[4]')\
     .config("spark.executor.memory","30g").config("spark.shuffle.file.buffer", "64k")\
      .config("spark.eventLog.buffer.kb", "200k").getOrCreate()

但这并没有解决错误,这是一个重要的项目,如果可以的话请帮助我,谢谢

I have a large dataframe of about 9million addresses. I am trying to clean the addresses by standardizing common spelling mistakes/short hand that I have in a dictionary, I have written the following function to perform this task:

import re
from pyspark.sql.functions import udf

def address_replace(address):
    class Xlator(dict):
        """ All-in-one multiple-string-substitution class """
        def _make_regex(self):
            """ Build re object based on the keys of the current dictionary """
            return re.compile(r'\b'+r'\b|\b'.join(map(re.escape, self.keys(  )))+r'\b')

        def __call__(self, match):
            """ Handler invoked for each regex match """
            return self[match.group(0)]

        def xlat(self, text):
            """ Translate text, returns the modified text. """
            return self._make_regex(  ).sub(self, text)

    if (address == '') or (address == None):
        result = ''
    else:
        xlat = Xlator(error_dict)
        result = xlat.xlat(address)

    return result

address_replaceUDF = udf(address_replace,StringType())

I have tested the function (address_replace) with a simple line of text and it produces the desired changes.

Then I use my udf to create a new column in my dataframe as follows (first removing blank and null addresses):

df_add = df_address.select('ID','RESIDENTIAL_ADDRESS').where(~ 
                                                  (col('RESIDENTIAL_ADDRESS')==''))\
                                          .where(col('RESIDENTIAL_ADDRESS').isNotNull())
print(df_add.count())

df_add = 
df_add.withColumn('CLEAN_RESIDENTIAL',address_replaceUDF(col('RESIDENTIAL_ADDRESS')))

df_add.select('RESIDENTIAL_ADDRESS','CLEAN_RESIDENTIAL')\
      .where('NOT RESIDENTIAL_ADDRESS = CLEAN_RESIDENTIAL').show(100,truncate=False)

I then want to inspect the results where changes have been made to the data. If I run the above code without the final where clause it produces a result, but with the where clause, I get the following error:

Socket Timeout Error Message

Which if you follow the error file path leads you to this:

Line 707 in file socket.py

I have tried to increase the memory and buffer size available with the following code:

spark = SparkSession.builder.master('local[4]')\
     .config("spark.executor.memory","30g").config("spark.shuffle.file.buffer", "64k")\
      .config("spark.eventLog.buffer.kb", "200k").getOrCreate()

But this did no resolve the error, this is an important project, please help me if you can, thank

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文