内存安全排列发电机,在ThreadPooleExecutor中具有增加长度值的增加
感谢@rici在评论中指导我朝着正确的方向指导。我发现condurrent.futures.map()
和concurrent.futures.execut()
使用迭代物的立即处理,而Python的默认default map()
map()代码>函数可以懒惰地浏览迭代,在处理大型产品
和排列
空格时,这是更可取的。 condurrent.futures
路由在下面的示例代码中达到2个或更多的组合时,将所有RAM用尽。
我现在想做的是在下面的更新代码中实现我所拥有的,并使用多线程。我想做的是多线程Python的默认map()
函数,从一个常见的product
generator中拉出迭代。我已经评论了“工作”多线程示例,以供参考,并展示我要完成的工作。
我偶然发现了来自main_lazy
函数的潜在修复程序,我很困惑,但是我很困惑关于如何使用我的代码函数返回2个值的函数?地图,拉链和Lambdas在这里使我感到困惑,我不确定块的东西如何与我合作的空间配合使用,但也许对别人有意义。
目前,这是我现在要多线程的内存安全代码的单线线程版本。
请注意,我不在乎这与我的用例无关的数学背后的数学,只要它使记忆使用量保持下降。这是更新的代码。
复制:
- 下载 vampi 并启动服务器
- 更新代码中的
base_url
下面匹配您的服务器 - 运行此代码
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib
# ---------------------------------------------------------------------------- #
# Variables #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the
# server on your machine
if BASE_URL[-1] != "/":
BASE_URL = BASE_URL + "/"
# ---------------------------------------------------------------------------- #
# Retrieve list of endpoints to product'ize #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
decoded_line = line.decode("utf-8").replace("\n","")
list_of_individual_api_endpoints.append(decoded_line)
# ---------------------------------------------------------------------------- #
# The multithreaded function we're going to use #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
# Deconstruct a proper endpoint from the passed in tuple
new_endpoint = ""
for x in endpoint:
new_endpoint += str(x) + "/"
new_endpoint = new_endpoint[:-1]
# Ping the endpoint to get a response code
response = requests.get(BASE_URL + str(new_endpoint))
status_code = response.status_code
return status_code, new_endpoint
# # ---------------------------------------------------------------------------- #
# # Main Function #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
# results_dict = {'endpoint':[], 'status_code': []}
# # Start the threadpool
# with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# # Try/Except for a keyboard interrupt. If this is not the correct implementation
# # to stop a multithreaded pool, please demonstrate the correct way
# try:
# # Iterate from 1 to 3
# for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
# print("Checking endpoints with depth of", i)
# # Can change this from .map to .execute, tried them both
# future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
# status_code = future.result()[0]
# endpoint = future.result()[1]
# if str(status_code) != "404":
# results_dict['endpoint'].append(endpoint)
# results_dict['status_code'].append(status_code)
# print("Endpoint:", endpoint, ", Status Code:", status_code)
# except KeyboardInterrupt:
# print("Early stopping engaged...")
# pass
# # Print the results dict
# print(results_dict)
# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
results_dict = {'endpoint':[], 'status_code': []}
for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
print("Checking endpoints with depth of", i)
results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
for status_code, endpoint in results:
# print(status_code, endpoint)
if str(status_code) != "404":
results_dict['endpoint'].append(endpoint)
results_dict['status_code'].append(status_code)
print("Endpoint:", endpoint, ", Status Code:", status_code)
# ---------------------------------------------------------------------------- #
# Start Program #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
# main()
main_lazy()
Thanks to @rici in the comments for steering me in the right direction on this. I have discovered that concurrent.futures.map()
and concurrent.futures.execut()
utilize immediate processing of iterables, whereas Python's default map()
function can lazily go through iterables, which is much more desirable when dealing with large product
and permutation
spaces. The concurrent.futures
route uses up all RAM when it gets to combo's of 2 or more in the example code below.
What I'm looking to do now is implement what I have in the updated code below, with multithreading. What I'm looking to do is multithread Python's default map()
function, pulling iterables from one common product
generator. I've commented out the "working" multithreaded example for reference and to show what I was trying to accomplish.
I stumbled upon a potential fix in the main_lazy
function from this post, however I'm confused on how to implement that with my code's function that returns 2 values? The maps and zips and lambdas confuse me here, and I'm not sure how the chunk thing would work with the space I'm working with, but maybe it'll make sense to someone else.
For now, here is the single threaded version of the memory safe code that I'm trying to multithread now.
Note that I don't care about the math behind how many combinations this generates as it's irrelevant to my use case, so long as it keeps memory usage down. Here's the updated code.
To reproduce:
- Download VAmPI and start the server
- Update the
BASE_URL
in the code below to match your server - Run this code
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib
# ---------------------------------------------------------------------------- #
# Variables #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the
# server on your machine
if BASE_URL[-1] != "/":
BASE_URL = BASE_URL + "/"
# ---------------------------------------------------------------------------- #
# Retrieve list of endpoints to product'ize #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
decoded_line = line.decode("utf-8").replace("\n","")
list_of_individual_api_endpoints.append(decoded_line)
# ---------------------------------------------------------------------------- #
# The multithreaded function we're going to use #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
# Deconstruct a proper endpoint from the passed in tuple
new_endpoint = ""
for x in endpoint:
new_endpoint += str(x) + "/"
new_endpoint = new_endpoint[:-1]
# Ping the endpoint to get a response code
response = requests.get(BASE_URL + str(new_endpoint))
status_code = response.status_code
return status_code, new_endpoint
# # ---------------------------------------------------------------------------- #
# # Main Function #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
# results_dict = {'endpoint':[], 'status_code': []}
# # Start the threadpool
# with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# # Try/Except for a keyboard interrupt. If this is not the correct implementation
# # to stop a multithreaded pool, please demonstrate the correct way
# try:
# # Iterate from 1 to 3
# for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
# print("Checking endpoints with depth of", i)
# # Can change this from .map to .execute, tried them both
# future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
# status_code = future.result()[0]
# endpoint = future.result()[1]
# if str(status_code) != "404":
# results_dict['endpoint'].append(endpoint)
# results_dict['status_code'].append(status_code)
# print("Endpoint:", endpoint, ", Status Code:", status_code)
# except KeyboardInterrupt:
# print("Early stopping engaged...")
# pass
# # Print the results dict
# print(results_dict)
# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
results_dict = {'endpoint':[], 'status_code': []}
for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
print("Checking endpoints with depth of", i)
results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
for status_code, endpoint in results:
# print(status_code, endpoint)
if str(status_code) != "404":
results_dict['endpoint'].append(endpoint)
results_dict['status_code'].append(status_code)
print("Endpoint:", endpoint, ", Status Code:", status_code)
# ---------------------------------------------------------------------------- #
# Start Program #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
# main()
main_lazy()
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(2)
我想出了一个解决方案。在从GitHub获取端点列表的代码部分之后,我使用以下内容:
这将为工人创建线程和内存安全生成器和多个线程。现在只有缺少的是如何使Ctrl + C使用此功能,但是无论如何。
I figured out a solution. After the section of code that gets the endpoints list from github, I use the following:
This creates a thread and memory safe generator and multiple threads for the workers. Now only thing missing is how to make CTRL + C work with this, but whatever.