Python:类似“map”的东西在线程上工作

发布于 2024-09-12 01:18:27 字数 1539 浏览 9 评论 0原文

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(5

你怎么敢 2024-09-19 01:18:28

这是我的线程映射的实现:

from threading import Thread
from queue import Queue

def thread_map(f, iterable, pool=None):
    """
    Just like [f(x) for x in iterable] but each f(x) in a separate thread.
    :param f: f
    :param iterable: iterable
    :param pool: thread pool, infinite by default
    :return: list if results
    """
    res = {}
    if pool is None:
        def target(arg, num):
            try:
                res[num] = f(arg)
            except:
                res[num] = sys.exc_info()

        threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
    else:
        class WorkerThread(Thread):
            def run(self):
                while True:
                    try:
                        num, arg = queue.get(block=False)
                        try:
                            res[num] = f(arg)
                        except:
                            res[num] = sys.exc_info()
                    except Empty:
                        break

        queue = Queue()
        for i, arg in enumerate(iterable):
            queue.put((i, arg))

        threads = [WorkerThread() for _ in range(pool)]

    [t.start() for t in threads]
    [t.join() for t in threads]
    return [res[i] for i in range(len(res))]

Here is my implementation of threaded map:

from threading import Thread
from queue import Queue

def thread_map(f, iterable, pool=None):
    """
    Just like [f(x) for x in iterable] but each f(x) in a separate thread.
    :param f: f
    :param iterable: iterable
    :param pool: thread pool, infinite by default
    :return: list if results
    """
    res = {}
    if pool is None:
        def target(arg, num):
            try:
                res[num] = f(arg)
            except:
                res[num] = sys.exc_info()

        threads = [Thread(target=target, args=[arg, i]) for i, arg in enumerate(iterable)]
    else:
        class WorkerThread(Thread):
            def run(self):
                while True:
                    try:
                        num, arg = queue.get(block=False)
                        try:
                            res[num] = f(arg)
                        except:
                            res[num] = sys.exc_info()
                    except Empty:
                        break

        queue = Queue()
        for i, arg in enumerate(iterable):
            queue.put((i, arg))

        threads = [WorkerThread() for _ in range(pool)]

    [t.start() for t in threads]
    [t.join() for t in threads]
    return [res[i] for i in range(len(res))]
西瓜 2024-09-19 01:18:28

Python 模块Queue 可能会帮助你。使用一个线程,该线程使用 Queue.put() 将所有 url 推送到队列中,工作线程只需 get() 一一地获取 URL。

Python 文档:queue — 同步队列类

The Python module Queue might help you. Use one thread that uses Queue.put() to push all urls into the queue and the worker threads simply get() the urls one by one.

Python Docs: queue — A synchronized queue class

新一帅帅 2024-09-19 01:18:28

我将其包装在一个函数中(未经测试):

import itertools
import threading
import urllib2
import Queue

def openurl(url, queue):
    def starter():
        try:
            result = urllib2.urlopen(url)
        except Ecxeption, exc:
            def raiser():
                raise exc
            queue.put((url, raiser))
        else:
            queue.put((url, lambda:result))
    threadind.Thread(target=starter).start()

myurls = ... # the list of urls
myqueue = Queue.Queue()

map(openurl, myurls, itertools.repeat(myqueue))

for each in myurls:
    url, getresult = queue.get()
    try:
        result = getresult()
    except Exception, exc:
        print 'exception raised:' + str(exc)
    else:
        # do stuff with result

I'd wrap it up in a function (untested):

import itertools
import threading
import urllib2
import Queue

def openurl(url, queue):
    def starter():
        try:
            result = urllib2.urlopen(url)
        except Ecxeption, exc:
            def raiser():
                raise exc
            queue.put((url, raiser))
        else:
            queue.put((url, lambda:result))
    threadind.Thread(target=starter).start()

myurls = ... # the list of urls
myqueue = Queue.Queue()

map(openurl, myurls, itertools.repeat(myqueue))

for each in myurls:
    url, getresult = queue.get()
    try:
        result = getresult()
    except Exception, exc:
        print 'exception raised:' + str(exc)
    else:
        # do stuff with result
情深如许 2024-09-19 01:18:27

multiprocessing.Pool< 中有一个 map 方法/a>.这会执行多个进程。

如果多个进程不是你的菜,你可以使用 multiprocessing.dummy< /a> 使用线程。

import urllib
import multiprocessing.dummy

p = multiprocessing.dummy.Pool(5)
def f(post):
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)

print p.map(f, range(3329361, 3329361 + 5))

There is a map method in multiprocessing.Pool. That does multiple processes.

And if multiple processes aren't your dish, you can use multiprocessing.dummy which uses threads.

import urllib
import multiprocessing.dummy

p = multiprocessing.dummy.Pool(5)
def f(post):
    return urllib.urlopen('http://stackoverflow.com/questions/%u' % post)

print p.map(f, range(3329361, 3329361 + 5))
等待圉鍢 2024-09-19 01:18:27

有人建议我为此使用 futures 包。我尝试了一下,似乎有效。

http://pypi.python.org/pypi/futures

这是一个示例:

"Download many URLs in parallel."

import functools
import urllib.request
import futures

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

with futures.ThreadPoolExecutor(50) as executor:
   future_list = executor.run_to_futures(
           [functools.partial(load_url, url, 30) for url in URLS])

Someone recommended I use the futures package for this. I tried it and it seems to be working.

http://pypi.python.org/pypi/futures

Here's an example:

"Download many URLs in parallel."

import functools
import urllib.request
import futures

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    return urllib.request.urlopen(url, timeout=timeout).read()

with futures.ThreadPoolExecutor(50) as executor:
   future_list = executor.run_to_futures(
           [functools.partial(load_url, url, 30) for url in URLS])
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文