python中multiprocessing.pool的apply_async()和pandas如何结合使用?
我有一个DataFrame表"all_urls_df",有十亿条数据,一共两列(title列,url列)。对url列进行检测,在DF表中新增status列,如果检测返回值是200,status列值为1,否则为0。
我想采用多进程的方式来实现该功能,部分代码如下:
def pandas_data_washed(df):
pool = multiprocessing.Pool(processes=35)
all_urls_df = df
all_urls_df["status"] = all_urls_df.apply(lambda x: 1 if pool.apply_async(check_url_ok, (x.url,)) else 0, axis=1)
pool.close()
pool.join()
def check_url_ok(url):
"""检测连接是否可用"""
useragent = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.75 Safari/537.36"
# print("正在检查URL %s" % url)
try:
# sleep_random() # 数据量多了以后,等待耗费时间太长,后期用代理池+多线程+等待
result = requests.get(url, headers={"User-Agent": useragent}, timeout=(5, 5))
if result.status_code == 200:
print("访问正常 %s" % url)
return True
else:
print("访问超时 %s" % url)
return False
except requests.exceptions.ConnectionError as e:
# print("URL %s 访问超时" % url)
# print("URL访问超时")
print("访问超时 %s" % url)
return False
--------------------------------------------------------
现在遇到的问题如下,如果我用pool.apply,"all_urls_df"的status列出来的结果是正确的,但是数据是逐条检测,无法实现进程池效果;如果我用pool.apply_async,进程池效果是出来了,但是status列的结果却全部都是1。请问我的代码是哪里写错了呢,该如何调整呢?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论