Python多线程Request问题

发布于 2022-09-12 03:36:04 字数 2646 浏览 17 评论 0

我在实现一个简单的需求,验证邮箱是否已经注册,向目标服务器发送一个请求后,如果返回204是未注册,返回409是已注册,待检测的邮箱存放在txt文件中,读取后做为列表,遍历执行,我使用request实现,并使用了代理,但是一个一个发送请求太慢,请问如何多线程并发执行?

  1. 如何使用多线程
  2. 对于待检测的邮箱,如何避免多线程同时检测一个邮箱,或者重复检测,希望能做到,每个线程都能检测到未检测的邮箱,已检测的邮箱不会被读取,应该是使用队列实现?

下面的是代码,部分信息打码。

import requests
import time
import json

class Validator(object):
    def __init__(self):
        self.headers = {
        
    }

        self.params = (

    )

    def run(self,name,proxies):
        data = '{"emailAddress":"%s"}' %name
        url = ''
        response = requests.post(url=url,headers=self.headers,params=self.params,data=data,timeout=9,proxies=proxies)
        #print(response.status_code)
        if response.status_code == 204:
            result = '{} 未注册 HTTP返回:{}'.format(name, response.status_code)
        elif response.status_code == 409:
            result = '{} 已注册 HTTP返回:{}'.format(name, response.status_code)
        else:
            result = '{} 检测异常 HTTP返回:{}'.format(name, response.status_code)
        #print(result)
        return result

# 以列表的形式读取待查询的帐号
def get_list():
    with open ('unvalidated.txt', 'r') as f:
        email_list = [i.strip() for i in f.readlines()]
    return email_list

# 返回代理
def get_proxy(retry=3):
    start = 0
    while start <= retry:
        res = requests.get("")
        dic_info = res.text
        dic_info = json.loads(dic_info)
        status_code = dic_info['code']
        if status_code != '0':
            start += 1
            time.sleep(0.5)
            continue
        data = dic_info['obj']
        ip = data[0]["ip"]
        port = data[0]["port"]
        ip_port =  'http://{}:{}'.format(ip,port)
        return {'http': ip_port}
    return None

if __name__=='__main__':
    app = Validator()
    proxies = get_proxy()
    # 循环查询
    for email in get_list():
        # 使用try,避免错误导致被挂起
        try:
            #proxies = get_proxy()
            # 定义验证时间
            validating_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
            # 定义打印格式
            info = '检测时间:{} {}'.format(validating_time, app.run(email,proxies))
            print(info)
            # 写入日志文件
            with open('validate_log.txt', 'a+') as f:
                f.write(info + '\n')
            # 设置查询延迟
            time.sleep(0.5)
        except requests.exceptions.RequestException as e:
            info = '检测时间:{} {} 检测错误 错误信息:{}'.format(validating_time, email, e)
            print(info)
            # 写入日志文件
            with open('validate_log.txt', 'a+') as f:
                f.write(info + '\n')
                time.sleep(0.5)
        

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

霓裳挽歌倾城醉 2022-09-19 03:36:04
from concurrent.futures import ThreadPoolExecutor, as_completed

pool = ThreadPoolExecutor(4)  # 4 threads

EMAILS = [
    # some emails here
]


def check(email):
    """
    if the email is registered, return None, else return the email
    """

    # do something


def save(email):
    """
    save the email
    """

    # do something


def main():
    tasks = [pool.submit(check, email) for email in EMAILS]
    for task in as_completed(tasks):
        check_result = task.result()
        if check_result is None:
            continue
        save(check_result)


if __name__ == '__main__':
    main()

自己再改改吧

∞觅青森が 2022-09-19 03:36:04

可以将请求与处理过程,扔到线程池中执行。
处理结果最好是append到一个列表中,最后全部线程执行结束后再处理这个列表。
因为多线程下文件写操作是不安全的

獨角戲 2022-09-19 03:36:04

我按照 @听完这一年 的答案,修改了代码,已经完成了主体功能,在50-100个线程的时候,完成的速度是以往的10倍不止,但是我发现,每次检测结果都比待检测见过要少,原因是因为一些原因(例如timeout)等,出错,导致线程被挂起,我尝试使用try,问题依旧存在,是不是要写在main()的循环提中呢?如果写在循环体中,如何传递email参数来得知是哪个帐号检测错误?万分感谢解答。

import requests

import json

import time

from concurrent.futures import ThreadPoolExecutor, as\_completed

  

pool = ThreadPoolExecutor(50)

  

def get\_email():

with open ('unvalidated.txt', 'r') as f:

email\_list \= \[i.strip() for i in f.readlines()\]

return email\_list

  

def get\_proxy(retry\=3):

start \= 0

while start <= retry:

res \= requests.get("")

dic\_info \= res.text

dic\_info \= json.loads(dic\_info)

status\_code \= dic\_info\['code'\]

if status\_code != '0':

start += 1

time.sleep(0.5)

continue

data \= dic\_info\['obj'\]

ip \= data\[0\]\["ip"\]

port \= data\[0\]\["port"\]

ip\_port \= 'http://{}:{}'.format(ip,port)

return {'http': ip\_port}

return None

  

def check(email,proxies):

headers \= {
}

  

params \= (
)

  

url \= '1'

data \= '{"emailAddress": "%s"}' % email

response \= requests.post(url\=url,params\=params,headers\=headers,data\=data,proxies\=proxies,timeout\=8)

try:

if response.status\_code \== 204:

print('未注册-{}'.format(email))

return '未注册-{}'.format(email)

elif response.status\_code \== 409:

print('已注册-{}'.format(email))

return '已注册-{}'.format(email)

else:

print('连接错误-{}'.format(email))

return '连接错误-{}'.format(email)

except requests.exceptions.RequestException:

print('连接失败-{}'.format(email))

return '连接失败-{}'.format(email)

  

def save(result):

with open('validate\_log.txt', 'a+') as f:

f.write(result + '\\n')

  

def main():

tasks \= \[pool.submit(check, email, proxies) for email in EMAILS\]

for task in as\_completed(tasks):

check\_result \= task.result()

save(check\_result)

  

if \_\_name\_\_ \== '\_\_main\_\_':

proxies \= get\_proxy()

EMAILS \= get\_email()

main()
冷…雨湿花 2022-09-19 03:36:04

我觉得可以考虑用协程

import httpx
import asyncio

location_email_list = ['1@qq.com', '2@163.com']

registered_email_list = []
not_registered_email_list = []
error_email_list = []

# 不考虑网络错误
async def task(client):
    while len(location_email_list)>0:
        email:str = location_email_list.pop()
        response = await client.get('query_url', params={'email':email})
        code:int = response.status_code
        if code==204:
            not_registered_email_list.append(email)
        elif code==409:
            registered_email_list.append(email)
        else:
            error_email_list.append(email)

async def main():
    proxies = {
        "http": "http://127.0.0.1:3080",
        "https": "http://127.0.0.1:3081",
    }
    async with httpx.AsyncClient(proxies=proxies) as client:
        task_list = [task(client), task(client), task(client), task(client), ]
        await asyncio.gather(*task_list)

asyncio.run(main())
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文