Python中IP地址的多线程ping

发布于 2025-01-31 11:10:07 字数 1926 浏览 2 评论 0原文

我有一个IP地址列表,例如1000 no。我正在阅读ip_file.txt,并将结果文件存储为result_date.txt。以下是我达到结果的代码。但是我的问题是执行整个文件需要花费太长时间。有人可以建议多线程,以便可以快速实现所需的结果吗?提前致谢。

#!/usr/bin/env python
import os
import csv
import paramiko
from datetime import datetime
import time
import sys
import re
from collections import defaultdict

# Verifies your os type
from paramiko import file

OS_TYPE = os.name
# Sets the count modifier to the os type
count = '-n' if OS_TYPE == 'nt' else '-c'


def create_ip_list():
    ip_list = []
    with open("ip_file.txt", "r") as file:
        for line in file:
            ip_list.append(line.strip())
    return ip_list

# fetching data
now = datetime.now()
dat = now.strftime("%d/%m/%Y")
# time = now.strftime("%H:%M:%S")
date_string = dat.replace('/', '-')

timestr = time.strftime("%d%m%Y-%H%M%S")


def ping_device(ip_list):
    """Ping ip_list and return results
        return: None
        rtype: None
    """
    results_file = open("results_" + str(timestr) + ".txt", "w")
    for ip in ip_list:
        response = os.popen(f"ping {ip} {count} 1").read()
        time.sleep(1.5)
        #fetch Average time
        print(response)
        for i in response.split("\n"):
            para = i.split("=")
            try:
                if para[0].strip() == "Minimum":
                    latency = para[3].strip()
                    print(latency)
                    # output1=latency[0:8].split(" ")
                    # test=output1[0]
                    # print(test)
            except:
                print("time run")

        if "Received = 1" and "Approximate" in response:
            #print(f"UP {ip} Ping Successful")
            results_file.write(f"{ip},UP,{latency}" + "\n")
        else:
            print(f"Down {ip} Ping Unsuccessful")
            results_file.write(f"{ip} Down" + "\n")
    results_file.close()

if __name__ == "__main__":
    ping_device(create_ip_list())

I have a list of IP addresses like 1000 no's. I am reading the ip_file.txt and storing the result file as result_date.txt. Below is the code that I achieved the result. But my issue is it's taking too long to execute the entire files. Can anyone suggest multithreading, please so that the desired result can be achieved quickly? Thanks in advance.

#!/usr/bin/env python
import os
import csv
import paramiko
from datetime import datetime
import time
import sys
import re
from collections import defaultdict

# Verifies your os type
from paramiko import file

OS_TYPE = os.name
# Sets the count modifier to the os type
count = '-n' if OS_TYPE == 'nt' else '-c'


def create_ip_list():
    ip_list = []
    with open("ip_file.txt", "r") as file:
        for line in file:
            ip_list.append(line.strip())
    return ip_list

# fetching data
now = datetime.now()
dat = now.strftime("%d/%m/%Y")
# time = now.strftime("%H:%M:%S")
date_string = dat.replace('/', '-')

timestr = time.strftime("%d%m%Y-%H%M%S")


def ping_device(ip_list):
    """Ping ip_list and return results
        return: None
        rtype: None
    """
    results_file = open("results_" + str(timestr) + ".txt", "w")
    for ip in ip_list:
        response = os.popen(f"ping {ip} {count} 1").read()
        time.sleep(1.5)
        #fetch Average time
        print(response)
        for i in response.split("\n"):
            para = i.split("=")
            try:
                if para[0].strip() == "Minimum":
                    latency = para[3].strip()
                    print(latency)
                    # output1=latency[0:8].split(" ")
                    # test=output1[0]
                    # print(test)
            except:
                print("time run")

        if "Received = 1" and "Approximate" in response:
            #print(f"UP {ip} Ping Successful")
            results_file.write(f"{ip},UP,{latency}" + "\n")
        else:
            print(f"Down {ip} Ping Unsuccessful")
            results_file.write(f"{ip} Down" + "\n")
    results_file.close()

if __name__ == "__main__":
    ping_device(create_ip_list())

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(1

岁月染过的梦 2025-02-07 11:10:07

编写一个功能ping_one_device,该采用单个ip并返回一个给出状态的单个字符串。从ping_device中将其拉出应该很容易。

然后

with open(results_file, "w") as results_file:
    with ThreadPoolExecutor() as executor:
        for result in map(ping_one_device, ip_list):
            results_file.write(result)

Write a function ping_one_device that takes a single ip and returns a single string giving the status. It should be easy to pull this out of ping_device.

Then

with open(results_file, "w") as results_file:
    with ThreadPoolExecutor() as executor:
        for result in map(ping_one_device, ip_list):
            results_file.write(result)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文