python consturrent.future不同行运行
我正在编写使用Python通过REST API调用来查询CMDB数据的代码。为了更快地获得结果,我正在使用 Python的模块允许并行查询执行。
以下是代码和输出:
import requests
import os
import base64
import threading
import concurrent.futures
import datetime
class ServiceNowAPI:
__ServiceNowUserName = os.environ.get('ServiceNow_USERNAME')
__ServiceNowPassword = None
__ServiceNowPasswordProd = None
__BASE_URL = None
__WSI_URL = 'https://esiocecr.contoso.com:9443/rsrv_servicenow-outbound/'
__ServiceNow_Cert_Path = 'C:\Certificates\OSMCert.pem'
# ServiceNow API URL paths
__GET_CI_DETAILS = "/api/now/table/cmdb_ci_circuit?sysparm_query=u_port_circuit_idLIKE__CINAME__^ORnameLIKE__CINAME__&sysparam_fields=*"
def __init__(self, ServiceNowEnvironment):
self.ServiceNowEnvironment = ServiceNowEnvironment
self.__ServiceNowPassword = os.environ.get('ServiceNow_PROD_PWD') if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else os.environ.get('ServiceNow_EAGLE_PWD')
self.__BASE_URL = "https://contososervicenow.service-now.com" if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else "https://contosoeagle.service-now.com"
CredsPair = "{0}:{1}".format(self.__ServiceNowUserName, self.__ServiceNowPassword)
CredBytes = CredsPair.encode('ascii')
Base64Creds = base64.b64encode(CredBytes).decode('utf-8')
self.__Authorization = "Basic {0}".format(Base64Creds)
def _GetRouterName(self, RouterLink):
RouterName = ''
with requests.Session() as ServiceNowCall:
ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': RouterLink})
ServiceNowCall.cert = self.__ServiceNow_Cert_Path
ServiceNowCall.verify = self.__ServiceNow_Cert_Path
with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
ResponseJSON = ResponseObject.json()
Results = ResponseJSON['result']
RouterName = Results['name']
return RouterName
def _GetCircuitCIDetails(self, CircuitID):
print('Started for ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
ResponseJSON = ''
URL = "{0}{1}".format(self.__BASE_URL, self.__GET_CI_DETAILS.replace('__CINAME__', CircuitID))
with requests.Session() as ServiceNowCall:
ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': URL})
ServiceNowCall.cert = self.__ServiceNow_Cert_Path
ServiceNowCall.verify = self.__ServiceNow_Cert_Path
with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
ResponseJSON = ResponseObject.json()
AllRecords = list(ResponseJSON['result'])
ActiveRecord = [rec for rec in AllRecords if rec['u_lifecycle_status'] != 'end of life'][0]
Router_Name = self._GetRouterName(ActiveRecord['u_router_name']['link'])
Results = {
'Name': ActiveRecord['name'],
'CarrierName': ActiveRecord['u_carrier_name'],
'NetworkType': ActiveRecord['u_network_type'],
'NetworkSubType': ActiveRecord['u_network_sub_type'],
'RouterName': Router_Name,
'PortCircuitID': ActiveRecord['name'],
'AccessCircuitID': ActiveRecord['u_port_circuit_id']
}
print('Finished ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
yield Results
def GetCIDetails(self, CICSV):
CircuitDetails = []
CircuitIDList = [Circuit.strip() for Circuit in CICSV.split(',')]
CircuitDetails = []
with concurrent.futures.ThreadPoolExecutor() as executor:
CIDetailsResult = { executor.submit(self._GetCircuitCIDetails, CircuitID): CircuitID for CircuitID in CircuitIDList }
for future in concurrent.futures.as_completed(CIDetailsResult):
CircuitCI = CIDetailsResult[future]
try:
CurrentResult = future.result()
except Exception as exc:
ErrorResult = dict({ 'Name': CircuitCI, 'CarrierName': 'NA', 'NetworkType': 'NA', 'NetworkSubType': 'NA', 'RouterName': 'NA', 'PortCircuitID': 'Error', 'AccessCircuitID': 'Error'})
CircuitDetails.extend(ErrorResult)
else:
CircuitDetails.extend(CurrentResult)
return CircuitDetails
if __name__ == "__main__":
ServiceNowAPIClass = ServiceNowAPI('NONPROD')
CIDetails = ServiceNowAPIClass.GetCIDetails('Circuit1,Circuit2')
print(CIDetails)
输出:
Started for Circuit1 at 30-Apr-2022 13:40:06.784841
Finished Circuit1 at 30-Apr-2022 13:40:09.749164
Started for Circuit2 at 30-Apr-2022 13:40:09.751166
Finished Circuit2 at 30-Apr-2022 13:40:12.479171
[{'Name': 'Circuit1', 'CarrierName': 'CenturyLink', 'NetworkType': 'EU-BTM', 'NetworkSubType': 'N/A', 'RouterName': 'RT1234ABCD03', 'PortCircuitID': 'Circuit1', 'AccessCircuitID': 'Circuit1'}, {'Name': 'Circuit2', 'CarrierName': 'Verizon', 'NetworkType': 'DPS-NA', 'NetworkSubType': 'N/A', 'RouterName': 'RT12345678ABC', 'PortCircuitID': 'Circuit2', 'AccessCircuitID': 'Circuit2'}]
但是,您可以看到执行并非并行进行。它正在一个接一个地完成每个电路的查询。
如何对此进行修复以运行_GetCircuitCidetails(self,circuitID)
对所有电路的函数并行?
I am writing code for querying CMDB data from ServiceNow through REST API calls using python. To gain results faster I am using concurrent.future module of python to allow parallel query executions.
Below is the code and output:
import requests
import os
import base64
import threading
import concurrent.futures
import datetime
class ServiceNowAPI:
__ServiceNowUserName = os.environ.get('ServiceNow_USERNAME')
__ServiceNowPassword = None
__ServiceNowPasswordProd = None
__BASE_URL = None
__WSI_URL = 'https://esiocecr.contoso.com:9443/rsrv_servicenow-outbound/'
__ServiceNow_Cert_Path = 'C:\Certificates\OSMCert.pem'
# ServiceNow API URL paths
__GET_CI_DETAILS = "/api/now/table/cmdb_ci_circuit?sysparm_query=u_port_circuit_idLIKE__CINAME__^ORnameLIKE__CINAME__&sysparam_fields=*"
def __init__(self, ServiceNowEnvironment):
self.ServiceNowEnvironment = ServiceNowEnvironment
self.__ServiceNowPassword = os.environ.get('ServiceNow_PROD_PWD') if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else os.environ.get('ServiceNow_EAGLE_PWD')
self.__BASE_URL = "https://contososervicenow.service-now.com" if ServiceNowEnvironment.lower() == "prod" or ServiceNowEnvironment.lower() == "production" else "https://contosoeagle.service-now.com"
CredsPair = "{0}:{1}".format(self.__ServiceNowUserName, self.__ServiceNowPassword)
CredBytes = CredsPair.encode('ascii')
Base64Creds = base64.b64encode(CredBytes).decode('utf-8')
self.__Authorization = "Basic {0}".format(Base64Creds)
def _GetRouterName(self, RouterLink):
RouterName = ''
with requests.Session() as ServiceNowCall:
ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': RouterLink})
ServiceNowCall.cert = self.__ServiceNow_Cert_Path
ServiceNowCall.verify = self.__ServiceNow_Cert_Path
with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
ResponseJSON = ResponseObject.json()
Results = ResponseJSON['result']
RouterName = Results['name']
return RouterName
def _GetCircuitCIDetails(self, CircuitID):
print('Started for ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
ResponseJSON = ''
URL = "{0}{1}".format(self.__BASE_URL, self.__GET_CI_DETAILS.replace('__CINAME__', CircuitID))
with requests.Session() as ServiceNowCall:
ServiceNowCall.headers.update({ 'Authorization': self.__Authorization, 'content-type': 'application/json', 'DP_EXTERNAL_URL': URL})
ServiceNowCall.cert = self.__ServiceNow_Cert_Path
ServiceNowCall.verify = self.__ServiceNow_Cert_Path
with ServiceNowCall.get(self.__WSI_URL) as ResponseObject:
ResponseJSON = ResponseObject.json()
AllRecords = list(ResponseJSON['result'])
ActiveRecord = [rec for rec in AllRecords if rec['u_lifecycle_status'] != 'end of life'][0]
Router_Name = self._GetRouterName(ActiveRecord['u_router_name']['link'])
Results = {
'Name': ActiveRecord['name'],
'CarrierName': ActiveRecord['u_carrier_name'],
'NetworkType': ActiveRecord['u_network_type'],
'NetworkSubType': ActiveRecord['u_network_sub_type'],
'RouterName': Router_Name,
'PortCircuitID': ActiveRecord['name'],
'AccessCircuitID': ActiveRecord['u_port_circuit_id']
}
print('Finished ' + CircuitID + ' at ' + datetime.datetime.now().strftime("%d-%b-%Y %H:%M:%S.%f"))
yield Results
def GetCIDetails(self, CICSV):
CircuitDetails = []
CircuitIDList = [Circuit.strip() for Circuit in CICSV.split(',')]
CircuitDetails = []
with concurrent.futures.ThreadPoolExecutor() as executor:
CIDetailsResult = { executor.submit(self._GetCircuitCIDetails, CircuitID): CircuitID for CircuitID in CircuitIDList }
for future in concurrent.futures.as_completed(CIDetailsResult):
CircuitCI = CIDetailsResult[future]
try:
CurrentResult = future.result()
except Exception as exc:
ErrorResult = dict({ 'Name': CircuitCI, 'CarrierName': 'NA', 'NetworkType': 'NA', 'NetworkSubType': 'NA', 'RouterName': 'NA', 'PortCircuitID': 'Error', 'AccessCircuitID': 'Error'})
CircuitDetails.extend(ErrorResult)
else:
CircuitDetails.extend(CurrentResult)
return CircuitDetails
if __name__ == "__main__":
ServiceNowAPIClass = ServiceNowAPI('NONPROD')
CIDetails = ServiceNowAPIClass.GetCIDetails('Circuit1,Circuit2')
print(CIDetails)
Output:
Started for Circuit1 at 30-Apr-2022 13:40:06.784841
Finished Circuit1 at 30-Apr-2022 13:40:09.749164
Started for Circuit2 at 30-Apr-2022 13:40:09.751166
Finished Circuit2 at 30-Apr-2022 13:40:12.479171
[{'Name': 'Circuit1', 'CarrierName': 'CenturyLink', 'NetworkType': 'EU-BTM', 'NetworkSubType': 'N/A', 'RouterName': 'RT1234ABCD03', 'PortCircuitID': 'Circuit1', 'AccessCircuitID': 'Circuit1'}, {'Name': 'Circuit2', 'CarrierName': 'Verizon', 'NetworkType': 'DPS-NA', 'NetworkSubType': 'N/A', 'RouterName': 'RT12345678ABC', 'PortCircuitID': 'Circuit2', 'AccessCircuitID': 'Circuit2'}]
However, as you can see the executions are not happening in Parallel. It is finishing the queries for each circuit one after another.
How can I fix this to run the _GetCircuitCIDetails(self, CircuitID)
function against all CircuitIDs in parallel?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论