python中multiprocessing.pool的apply_async()和pandas如何结合使用?

发布于 2022-09-13 00:34:57 字数 1317 浏览 26 评论 0

我有一个DataFrame表"all_urls_df",有十亿条数据,一共两列(title列,url列)。对url列进行检测,在DF表中新增status列,如果检测返回值是200,status列值为1,否则为0。
我想采用多进程的方式来实现该功能,部分代码如下:

def pandas_data_washed(df):
pool = multiprocessing.Pool(processes=35)
all_urls_df = df
all_urls_df["status"] = all_urls_df.apply(lambda x: 1 if pool.apply_async(check_url_ok, (x.url,)) else 0, axis=1)
pool.close()
pool.join()


def check_url_ok(url):
    """检测连接是否可用"""
    useragent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36"
    # print("正在检查URL %s" % url)

    try:
        # sleep_random()  # 数据量多了以后,等待耗费时间太长,后期用代理池+多线程+等待
        result = requests.get(url, headers={"User-Agent": useragent}, timeout=(5, 5))
        if result.status_code == 200:

            print("访问正常 %s" % url)
            return True
        else:
            print("访问超时 %s" % url)
            return False
    except requests.exceptions.ConnectionError as e:
        # print("URL %s 访问超时" % url)
        # print("URL访问超时")
        print("访问超时 %s" % url)
        return False

--------------------------------------------------------
现在遇到的问题如下,如果我用pool.apply,"all_urls_df"的status列出来的结果是正确的,但是数据是逐条检测,无法实现进程池效果;如果我用pool.apply_async,进程池效果是出来了,但是status列的结果却全部都是1。请问我的代码是哪里写错了呢,该如何调整呢?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文