内存安全排列发电机,在ThreadPooleExecutor中具有增加长度值的增加

发布于 2025-02-01 15:52:36 字数 5467 浏览 3 评论 0原文

感谢@rici在评论中指导我朝着正确的方向指导。我发现condurrent.futures.map()concurrent.futures.execut()使用迭代物的立即处理,而Python的默认default map() map()代码>函数可以懒惰地浏览迭代,在处理大型产品排列空格时,这是更可取的。 condurrent.futures路由在下面的示例代码中达到2个或更多的组合时,将所有RAM用尽。

我现在想做的是在下面的更新代码中实现我所拥有的,并使用多线程。我想做的是多线程Python的默认map()函数,从一个常见的product generator中拉出迭代。我已经评论了“工作”多线程示例,以供参考,并展示我要完成的工作。

我偶然发现了来自main_lazy函数的潜在修复程序,我很困惑,但是我很困惑关于如何使用我的代码函数返回2个值的函数?地图,拉链和Lambdas在这里使我感到困惑,我不确定块的东西如何与我合作的空间配合使用,但也许对别人有意义。

目前,这是我现在要多线程的内存安全代码的单线线程版本。

请注意,我不在乎这与我的用例无关的数学背后的数学,只要它使记忆使用量保持下降。这是更新的代码。

复制:

  1. 下载 vampi 并启动服务器
  2. 更新代码中的base_url下面匹配您的服务器
  3. 运行此代码
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib


# ---------------------------------------------------------------------------- #
#                                   Variables                                  #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the 
                                       # server on your machine
if BASE_URL[-1] != "/":
    BASE_URL = BASE_URL + "/"


# ---------------------------------------------------------------------------- #
#                   Retrieve list of endpoints to product'ize                  #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
    decoded_line = line.decode("utf-8").replace("\n","")
    list_of_individual_api_endpoints.append(decoded_line)


# ---------------------------------------------------------------------------- #
#                 The multithreaded function we're going to use                #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
    # Deconstruct a proper endpoint from the passed in tuple
    new_endpoint = ""
    for x in endpoint:
        new_endpoint += str(x) + "/"
    new_endpoint = new_endpoint[:-1]
    # Ping the endpoint to get a response code
    response = requests.get(BASE_URL + str(new_endpoint))
    status_code = response.status_code
    return status_code, new_endpoint


# # ---------------------------------------------------------------------------- #
# #                                 Main Function                                #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
#     results_dict = {'endpoint':[], 'status_code': []}
#     # Start the threadpool
#     with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
#         # Try/Except for a keyboard interrupt. If this is not the correct implementation
#         # to stop a multithreaded pool, please demonstrate the correct way
#         try:
#             # Iterate from 1 to 3
#             for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
#                 print("Checking endpoints with depth of", i)
#                 # Can change this from .map to .execute, tried them both
#                 future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
#                 status_code = future.result()[0]
#                 endpoint = future.result()[1]
#                 if str(status_code) != "404":
#                     results_dict['endpoint'].append(endpoint)
#                     results_dict['status_code'].append(status_code)
#                     print("Endpoint:", endpoint, ", Status Code:", status_code)
#         except KeyboardInterrupt:
#             print("Early stopping engaged...")
#             pass
#     # Print the results dict
#     print(results_dict)


# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
    results_dict = {'endpoint':[], 'status_code': []}
    for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
        print("Checking endpoints with depth of", i)
        results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
        for status_code, endpoint in results:
            # print(status_code, endpoint)
            if str(status_code) != "404":
                results_dict['endpoint'].append(endpoint)
                results_dict['status_code'].append(status_code)
                print("Endpoint:", endpoint, ", Status Code:", status_code)

# ---------------------------------------------------------------------------- #
#                                 Start Program                                #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
    # main()
    main_lazy()

Thanks to @rici in the comments for steering me in the right direction on this. I have discovered that concurrent.futures.map() and concurrent.futures.execut() utilize immediate processing of iterables, whereas Python's default map() function can lazily go through iterables, which is much more desirable when dealing with large product and permutation spaces. The concurrent.futures route uses up all RAM when it gets to combo's of 2 or more in the example code below.

What I'm looking to do now is implement what I have in the updated code below, with multithreading. What I'm looking to do is multithread Python's default map() function, pulling iterables from one common product generator. I've commented out the "working" multithreaded example for reference and to show what I was trying to accomplish.

I stumbled upon a potential fix in the main_lazy function from this post, however I'm confused on how to implement that with my code's function that returns 2 values? The maps and zips and lambdas confuse me here, and I'm not sure how the chunk thing would work with the space I'm working with, but maybe it'll make sense to someone else.

For now, here is the single threaded version of the memory safe code that I'm trying to multithread now.

Note that I don't care about the math behind how many combinations this generates as it's irrelevant to my use case, so long as it keeps memory usage down. Here's the updated code.

To reproduce:

  1. Download VAmPI and start the server
  2. Update the BASE_URL in the code below to match your server
  3. Run this code
import concurrent.futures
from itertools import product, chain, islice
import requests, urllib


# ---------------------------------------------------------------------------- #
#                                   Variables                                  #
# ---------------------------------------------------------------------------- #
MAX_ENDPOINT_PERMUTATION_LENGTH = 3
MAX_WORKERS = 6
# BASE_URL = 'http://localhost:5000/'
BASE_URL = 'http://172.16.1.82:5000//' # This should be the Vampi url of the 
                                       # server on your machine
if BASE_URL[-1] != "/":
    BASE_URL = BASE_URL + "/"


# ---------------------------------------------------------------------------- #
#                   Retrieve list of endpoints to product'ize                  #
# ---------------------------------------------------------------------------- #
list_of_individual_api_endpoints = []
url = r"https://gist.githubusercontent.com/yassineaboukir/8e12adefbd505ef704674ad6ad48743d/raw/3ea2b7175f2fcf8e6de835c72cb2b2048f73f847/List%2520of%2520API%2520endpoints%2520&%2520objects"
file = urllib.request.urlopen(url)
for line in file:
    decoded_line = line.decode("utf-8").replace("\n","")
    list_of_individual_api_endpoints.append(decoded_line)


# ---------------------------------------------------------------------------- #
#                 The multithreaded function we're going to use                #
# ---------------------------------------------------------------------------- #
def ping_current_api_endpoint(endpoint):
    # Deconstruct a proper endpoint from the passed in tuple
    new_endpoint = ""
    for x in endpoint:
        new_endpoint += str(x) + "/"
    new_endpoint = new_endpoint[:-1]
    # Ping the endpoint to get a response code
    response = requests.get(BASE_URL + str(new_endpoint))
    status_code = response.status_code
    return status_code, new_endpoint


# # ---------------------------------------------------------------------------- #
# #                                 Main Function                                #
# # ---------------------------------------------------------------------------- #
# # MULTITHREADED ATTEMPT. EATS UP RAM WHEN GETTING TO DEPTH OF 2
# def main():
#     results_dict = {'endpoint':[], 'status_code': []}
#     # Start the threadpool
#     with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
#         # Try/Except for a keyboard interrupt. If this is not the correct implementation
#         # to stop a multithreaded pool, please demonstrate the correct way
#         try:
#             # Iterate from 1 to 3
#             for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
#                 print("Checking endpoints with depth of", i)
#                 # Can change this from .map to .execute, tried them both
#                 future = executor.submit(ping_current_api_endpoint, product(list_of_individual_api_endpoints, repeat=i))
#                 status_code = future.result()[0]
#                 endpoint = future.result()[1]
#                 if str(status_code) != "404":
#                     results_dict['endpoint'].append(endpoint)
#                     results_dict['status_code'].append(status_code)
#                     print("Endpoint:", endpoint, ", Status Code:", status_code)
#         except KeyboardInterrupt:
#             print("Early stopping engaged...")
#             pass
#     # Print the results dict
#     print(results_dict)


# LAZY MAP FUNCTION, SINGLE THREADED, THAT I'D LIKE TO TURN INTO MULTI
def main_lazy():
    results_dict = {'endpoint':[], 'status_code': []}
    for i in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):
        print("Checking endpoints with depth of", i)
        results = map(ping_current_api_endpoint, (product(list_of_individual_api_endpoints, repeat=i)))
        for status_code, endpoint in results:
            # print(status_code, endpoint)
            if str(status_code) != "404":
                results_dict['endpoint'].append(endpoint)
                results_dict['status_code'].append(status_code)
                print("Endpoint:", endpoint, ", Status Code:", status_code)

# ---------------------------------------------------------------------------- #
#                                 Start Program                                #
# ---------------------------------------------------------------------------- #
if __name__ == "__main__":
    # main()
    main_lazy()

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

活泼老夫 2025-02-08 15:52:36

我想出了一个解决方案。在从GitHub获取端点列表的代码部分之后,我使用以下内容:

# ---------------------------------------------------------------------------- #
#                        Function to Ping API Endpoints                        #
# ---------------------------------------------------------------------------- #
# Create Thread Safe Class for Generator and Worker Function
results_dict = {"endpoint": [], "status_code": []}
class LockedIterator(object):
    def __init__(self, iterator):
        self.lock = threading.Lock()
        self.iterator = iter(iterator)
    def __iter__(self): return self
    def __next__(self):
        with self.lock:
            return self.iterator.__next__()
            
def generator_function(repeat):
    for x in product(list_endpoint_words, repeat=repeat):
        yield x

def worker_function(current_gen_value):
    for endpoint in current_gen_value:
        # time.sleep(randint(0,2))
        if len(endpoint) > 1:
            for x in endpoint:
                new_endpoint = x + "/"
            new_endpoint = new_endpoint[:-1]
        else:
            new_endpoint = endpoint[0]
        response = requests.get(BASE_URL + str(new_endpoint))
        status_code = response.status_code
        if str(status_code) != "404":
            results_dict['endpoint'].append(endpoint)
            results_dict['status_code'].append(status_code)
            print("Endpoint:", endpoint, ", Status Code:", status_code)


# ---------------------------------------------------------------------------- #
#                              Main Program Start                              #
# ---------------------------------------------------------------------------- #
start_time = time.time()

for repeat in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):

    thread_safe_generator = LockedIterator(generator_function(repeat))

    threads_list = []
    for _ in range(MAX_WORKERS):
        thread = threading.Thread(target=worker_function, args=(thread_safe_generator,))
        # thread.daemon = True
        threads_list.append(thread)
    for thread in threads_list:
        thread.start()
    for thread in threads_list:
        thread.join()
    
results_df = DataFrame.from_dict(results_dict)
results_df = results_df.sort_values(by='status_code', ascending=True).reset_index(drop=True)
results_df.to_csv("endpoint_results.csv", index=False)
print(results_df)
print("Elapsed time:", int((time.time() - start_time) / 60), "minutes." )

这将为工人创建线程和内存安全生成器和多个线程。现在只有缺少的是如何使Ctrl + C使用此功能,但是无论如何。

I figured out a solution. After the section of code that gets the endpoints list from github, I use the following:

# ---------------------------------------------------------------------------- #
#                        Function to Ping API Endpoints                        #
# ---------------------------------------------------------------------------- #
# Create Thread Safe Class for Generator and Worker Function
results_dict = {"endpoint": [], "status_code": []}
class LockedIterator(object):
    def __init__(self, iterator):
        self.lock = threading.Lock()
        self.iterator = iter(iterator)
    def __iter__(self): return self
    def __next__(self):
        with self.lock:
            return self.iterator.__next__()
            
def generator_function(repeat):
    for x in product(list_endpoint_words, repeat=repeat):
        yield x

def worker_function(current_gen_value):
    for endpoint in current_gen_value:
        # time.sleep(randint(0,2))
        if len(endpoint) > 1:
            for x in endpoint:
                new_endpoint = x + "/"
            new_endpoint = new_endpoint[:-1]
        else:
            new_endpoint = endpoint[0]
        response = requests.get(BASE_URL + str(new_endpoint))
        status_code = response.status_code
        if str(status_code) != "404":
            results_dict['endpoint'].append(endpoint)
            results_dict['status_code'].append(status_code)
            print("Endpoint:", endpoint, ", Status Code:", status_code)


# ---------------------------------------------------------------------------- #
#                              Main Program Start                              #
# ---------------------------------------------------------------------------- #
start_time = time.time()

for repeat in range(1, MAX_ENDPOINT_PERMUTATION_LENGTH+1):

    thread_safe_generator = LockedIterator(generator_function(repeat))

    threads_list = []
    for _ in range(MAX_WORKERS):
        thread = threading.Thread(target=worker_function, args=(thread_safe_generator,))
        # thread.daemon = True
        threads_list.append(thread)
    for thread in threads_list:
        thread.start()
    for thread in threads_list:
        thread.join()
    
results_df = DataFrame.from_dict(results_dict)
results_df = results_df.sort_values(by='status_code', ascending=True).reset_index(drop=True)
results_df.to_csv("endpoint_results.csv", index=False)
print(results_df)
print("Elapsed time:", int((time.time() - start_time) / 60), "minutes." )

This creates a thread and memory safe generator and multiple threads for the workers. Now only thing missing is how to make CTRL + C work with this, but whatever.

我早已燃尽 2025-02-08 15:52:36
from itertools import product
from concurrent.futures import ThreadPoolExecutor, as_completed


words = ["I", "like", "to", "take", "my", "dogs", "for", "a",
         "walk", "every", "day", "after", "work"]


def gen():
    for i in product(words, repeat=3):
        yield i


def worker(rec_str):
    return rec_str


def main():
    with ThreadPoolExecutor() as executor:
        fs = (executor.submit(worker, i) for i in gen())
        for i in as_completed(fs):
            print(i.result())


if __name__ == "__main__":
    main()
from itertools import product
from concurrent.futures import ThreadPoolExecutor, as_completed


words = ["I", "like", "to", "take", "my", "dogs", "for", "a",
         "walk", "every", "day", "after", "work"]


def gen():
    for i in product(words, repeat=3):
        yield i


def worker(rec_str):
    return rec_str


def main():
    with ThreadPoolExecutor() as executor:
        fs = (executor.submit(worker, i) for i in gen())
        for i in as_completed(fs):
            print(i.result())


if __name__ == "__main__":
    main()
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文