python consturrent.future不同行运行

发布于 2025-01-25 15:13:55 字数 5673 浏览 4 评论 0原文

我正在编写使用Python通过REST API调用来查询CMDB数据的代码。为了更快地获得结果,我正在使用 Python的模块允许并行查询执行。

以下是代码和输出:

import requests
import os
import base64
import threading
import concurrent.futures
import datetime

class ServiceNowAPI:
    __ServiceNowUserName = os.environ.get('ServiceNow_USERNAME')
    __ServiceNowPassword = None
    __ServiceNowPasswordProd = None
    __BASE_URL = None
    __WSI_URL = 'https://esiocecr.contoso.com:9443/rsrv_servicenow-outbound/'
    __ServiceNow_Cert_Path = 'C:\Certificates\OSMCert.pem'
    # ServiceNow API URL paths
    __GET_CI_DETAILS = "/api/now/table/cmdb_ci_circuit?sysparm_query=u_port_circuit_idLIKE__CINAME__^ORnameLIKE__CINAME__&sysparam_fields=*"

def __init__(self, ServiceNowEnvironment):
    self.ServiceNowEnvironment = ServiceNowEnvironment
    self.__ServiceNowPassword = os.environ.get('ServiceNow_PROD_PWD') if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else os.environ.get('ServiceNow_EAGLE_PWD')
    self.__BASE_URL = "https://contososervicenow.service-now.com" if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else "https://contosoeagle.service-now.com"
    CredsPair = "{0}:{1}".format(self.__ServiceNowUserName, self.__ServiceNowPassword)
    CredBytes = CredsPair.encode('ascii')
    Base64Creds = base64.b64encode(CredBytes).decode('utf-8')
    self.__Authorization = "Basic {0}".format(Base64Creds)

def _GetRouterName(self, RouterLink):
    RouterName = ''
    with requests.Session() as ServiceNowCall:
        ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': RouterLink})
        ServiceNowCall.cert = self.__ServiceNow_Cert_Path
        ServiceNowCall.verify = self.__ServiceNow_Cert_Path
        with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
            ResponseJSON = ResponseObject.json()
            Results = ResponseJSON['result']
            RouterName = Results['name']
            return RouterName

def _GetCircuitCIDetails(self, CircuitID):
    print('Started for ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
    ResponseJSON = ''
    URL = "{0}{1}".format(self.__BASE_URL, self.__GET_CI_DETAILS.replace('__CINAME__', CircuitID))
    with requests.Session() as ServiceNowCall:
        ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': URL})
        ServiceNowCall.cert = self.__ServiceNow_Cert_Path
        ServiceNowCall.verify = self.__ServiceNow_Cert_Path
        with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
            ResponseJSON = ResponseObject.json()
            AllRecords = list(ResponseJSON['result'])
            ActiveRecord = [rec for rec in AllRecords if rec['u_lifecycle_status'] != 'end of life'][0]
            Router_Name = self._GetRouterName(ActiveRecord['u_router_name']['link'])
            Results = {
                        'Name': ActiveRecord['name'],
                        'CarrierName': ActiveRecord['u_carrier_name'],
                        'NetworkType': ActiveRecord['u_network_type'],
                        'NetworkSubType': ActiveRecord['u_network_sub_type'],
                        'RouterName': Router_Name,
                        'PortCircuitID': ActiveRecord['name'],
                        'AccessCircuitID': ActiveRecord['u_port_circuit_id']
                    }
            print('Finished ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
            yield Results

def GetCIDetails(self, CICSV):
    CircuitDetails = []
    CircuitIDList = [Circuit.strip() for Circuit in CICSV.split(',')]

    CircuitDetails = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        CIDetailsResult = { executor.submit(self._GetCircuitCIDetails, CircuitID): CircuitID for CircuitID in CircuitIDList }
        for future in concurrent.futures.as_completed(CIDetailsResult):
            CircuitCI = CIDetailsResult[future]
            try:
                CurrentResult = future.result()
            except Exception as exc:
                ErrorResult = dict({ 'Name': CircuitCI, 'CarrierName': 'NA', 'NetworkType': 'NA', 'NetworkSubType': 'NA', 'RouterName': 'NA', 'PortCircuitID': 'Error', 'AccessCircuitID': 'Error'})
                CircuitDetails.extend(ErrorResult)
            else:
                CircuitDetails.extend(CurrentResult)
    return CircuitDetails

if __name__ == "__main__":
    ServiceNowAPIClass = ServiceNowAPI('NONPROD')
    CIDetails = ServiceNowAPIClass.GetCIDetails('Circuit1,Circuit2')
    print(CIDetails)

输出

Started for Circuit1 at 30-Apr-2022 13:40:06.784841
Finished Circuit1 at 30-Apr-2022 13:40:09.749164
Started for Circuit2 at 30-Apr-2022 13:40:09.751166
Finished Circuit2 at 30-Apr-2022 13:40:12.479171
[{'Name': 'Circuit1', 'CarrierName': 'CenturyLink', 'NetworkType': 'EU-BTM', 'NetworkSubType': 'N/A', 'RouterName': 'RT1234ABCD03', 'PortCircuitID': 'Circuit1', 'AccessCircuitID': 'Circuit1'}, {'Name': 'Circuit2', 'CarrierName': 'Verizon', 'NetworkType': 'DPS-NA', 'NetworkSubType': 'N/A', 'RouterName': 'RT12345678ABC', 'PortCircuitID': 'Circuit2', 'AccessCircuitID': 'Circuit2'}]

但是,您可以看到执行并非并行进行。它正在一个接一个地完成每个电路的查询。

如何对此进行修复以运行_GetCircuitCidetails(self,circuitID)对所有电路的函数并行?

I am writing code for querying CMDB data from ServiceNow through REST API calls using python. To gain results faster I am using concurrent.future module of python to allow parallel query executions.

Below is the code and output:

import requests
import os
import base64
import threading
import concurrent.futures
import datetime

class ServiceNowAPI:
    __ServiceNowUserName = os.environ.get('ServiceNow_USERNAME')
    __ServiceNowPassword = None
    __ServiceNowPasswordProd = None
    __BASE_URL = None
    __WSI_URL = 'https://esiocecr.contoso.com:9443/rsrv_servicenow-outbound/'
    __ServiceNow_Cert_Path = 'C:\Certificates\OSMCert.pem'
    # ServiceNow API URL paths
    __GET_CI_DETAILS = "/api/now/table/cmdb_ci_circuit?sysparm_query=u_port_circuit_idLIKE__CINAME__^ORnameLIKE__CINAME__&sysparam_fields=*"

def __init__(self, ServiceNowEnvironment):
    self.ServiceNowEnvironment = ServiceNowEnvironment
    self.__ServiceNowPassword = os.environ.get('ServiceNow_PROD_PWD') if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else os.environ.get('ServiceNow_EAGLE_PWD')
    self.__BASE_URL = "https://contososervicenow.service-now.com" if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else "https://contosoeagle.service-now.com"
    CredsPair = "{0}:{1}".format(self.__ServiceNowUserName, self.__ServiceNowPassword)
    CredBytes = CredsPair.encode('ascii')
    Base64Creds = base64.b64encode(CredBytes).decode('utf-8')
    self.__Authorization = "Basic {0}".format(Base64Creds)

def _GetRouterName(self, RouterLink):
    RouterName = ''
    with requests.Session() as ServiceNowCall:
        ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': RouterLink})
        ServiceNowCall.cert = self.__ServiceNow_Cert_Path
        ServiceNowCall.verify = self.__ServiceNow_Cert_Path
        with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
            ResponseJSON = ResponseObject.json()
            Results = ResponseJSON['result']
            RouterName = Results['name']
            return RouterName

def _GetCircuitCIDetails(self, CircuitID):
    print('Started for ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
    ResponseJSON = ''
    URL = "{0}{1}".format(self.__BASE_URL, self.__GET_CI_DETAILS.replace('__CINAME__', CircuitID))
    with requests.Session() as ServiceNowCall:
        ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': URL})
        ServiceNowCall.cert = self.__ServiceNow_Cert_Path
        ServiceNowCall.verify = self.__ServiceNow_Cert_Path
        with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
            ResponseJSON = ResponseObject.json()
            AllRecords = list(ResponseJSON['result'])
            ActiveRecord = [rec for rec in AllRecords if rec['u_lifecycle_status'] != 'end of life'][0]
            Router_Name = self._GetRouterName(ActiveRecord['u_router_name']['link'])
            Results = {
                        'Name': ActiveRecord['name'],
                        'CarrierName': ActiveRecord['u_carrier_name'],
                        'NetworkType': ActiveRecord['u_network_type'],
                        'NetworkSubType': ActiveRecord['u_network_sub_type'],
                        'RouterName': Router_Name,
                        'PortCircuitID': ActiveRecord['name'],
                        'AccessCircuitID': ActiveRecord['u_port_circuit_id']
                    }
            print('Finished ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
            yield Results

def GetCIDetails(self, CICSV):
    CircuitDetails = []
    CircuitIDList = [Circuit.strip() for Circuit in CICSV.split(',')]

    CircuitDetails = []

    with concurrent.futures.ThreadPoolExecutor() as executor:
        CIDetailsResult = { executor.submit(self._GetCircuitCIDetails, CircuitID): CircuitID for CircuitID in CircuitIDList }
        for future in concurrent.futures.as_completed(CIDetailsResult):
            CircuitCI = CIDetailsResult[future]
            try:
                CurrentResult = future.result()
            except Exception as exc:
                ErrorResult = dict({ 'Name': CircuitCI, 'CarrierName': 'NA', 'NetworkType': 'NA', 'NetworkSubType': 'NA', 'RouterName': 'NA', 'PortCircuitID': 'Error', 'AccessCircuitID': 'Error'})
                CircuitDetails.extend(ErrorResult)
            else:
                CircuitDetails.extend(CurrentResult)
    return CircuitDetails

if __name__ == "__main__":
    ServiceNowAPIClass = ServiceNowAPI('NONPROD')
    CIDetails = ServiceNowAPIClass.GetCIDetails('Circuit1,Circuit2')
    print(CIDetails)

Output:

Started for Circuit1 at 30-Apr-2022 13:40:06.784841
Finished Circuit1 at 30-Apr-2022 13:40:09.749164
Started for Circuit2 at 30-Apr-2022 13:40:09.751166
Finished Circuit2 at 30-Apr-2022 13:40:12.479171
[{'Name': 'Circuit1', 'CarrierName': 'CenturyLink', 'NetworkType': 'EU-BTM', 'NetworkSubType': 'N/A', 'RouterName': 'RT1234ABCD03', 'PortCircuitID': 'Circuit1', 'AccessCircuitID': 'Circuit1'}, {'Name': 'Circuit2', 'CarrierName': 'Verizon', 'NetworkType': 'DPS-NA', 'NetworkSubType': 'N/A', 'RouterName': 'RT12345678ABC', 'PortCircuitID': 'Circuit2', 'AccessCircuitID': 'Circuit2'}]

However, as you can see the executions are not happening in Parallel. It is finishing the queries for each circuit one after another.

How can I fix this to run the _GetCircuitCIDetails(self, CircuitID) function against all CircuitIDs in parallel?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文