Python扭曲:迭代器和yields/inlineCallbacks

发布于 2024-11-06 06:36:44 字数 1121 浏览 0 评论 0原文

各位, 我完全困惑了,所以我什至可能没有正确地询问问题,但这里是:

我有一个使用 inlineCallbacks 的扭曲应用程序。现在我需要定义一个迭代器,这意味着生成器将返回给调用者。但是,迭代器不能被 inlineCallbacks 修饰,可以吗?如果没有,那么我该如何编写这样的代码。

只是澄清一下:目标是 process_loop 需要每隔(比如 5 秒)调用一次,它只能处理其中的一大块(比如 10 秒),然后必须放手。然而,要知道 10 个块(存储在缓存中,这是一个字典的字典),它需要调用一个返回 deferred 的函数。

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'

Folks,
Am thoroughly confused, so it's possible I am not even asking things correctly, but here goes:

I have a twisted application using inlineCallbacks. Now I need to define an iterator which will mean a generator is returned to the caller. However, the iterator cannot be inlineCallbacks decorated, can it be? If not, then how I do I code something like this.

Just to clarify: the goal is process_loop needs to be called every, say 5, seconds, it can process only ONE chunk of, say 10, and then it has to let go. However, to know that chunk of 10 (stored in cached, which is a dict of a dict), it needs to call a function that returns deferred.

@inlineCallbacks ### can\'t have inlineCallbacks here, right?
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield (call func here which returns deferred)
        if result is True:
            for k,v in cachedvalue.items():
                yield cachename, k, v

@inlineCallbacks
def process_chunk(myiter, num):
    try:
        for i in xrange(num):
            nextval = myiter.next()
            yield some_processing(nextval)
        returnValue(False)
    except StopIteration:
        returnValue(True)

@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    result = yield process_chunk(myiter, 10)
    if not result:
        print 'More left'
        reactor.callLater(5, process_loop, cached)
    else:
        print 'All done'

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

庆幸我还是我 2024-11-13 06:36:44

你说得对,你无法在 cacheiter 中表达你想要表达的内容。 inlineCallbacks 装饰器不会让您拥有返回迭代器的函数。如果用它修饰一个函数,那么结果是一个始终返回 Deferred 的函数。这就是它的用途。

造成这一问题的部分原因是迭代器不能很好地处理异步代码。如果在生成迭代器的元素时涉及到 Deferred,那么从迭代器中出来的元素将首先成为 Deferred。

您可能会做这样的事情来解释这一点:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

这可以工作,但看起来特别奇怪。由于生成器只能屈服于其调用者(例如,不能屈服于其调用者的调用者),因此 some_jobs 迭代器对此无​​能为力;只有 process_work 中的词法代码可以产生一个 Deferred 到 inlineCallbacks 提供的蹦床来等待。

如果您不介意这种模式,那么我们可以将您的代码想象成如下所示:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

不过,您可以采取的另一种方法是使用 twisted.internet.task.cooperatecooperate 接受一个迭代器并使用它,假设使用它的成本可能很高,并将作业分解为多个反应器迭代。从上面获取cacheiter的定义:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()

You're right that you can't express what you want to express in cacheiter. The inlineCallbacks decorator won't let you have a function that returns an iterator. If you decorate a function with it, then the result is a function that always returns a Deferred. That's what it is for.

Part of what makes this difficult is that iterators don't work well with asynchronous code. If there's a Deferred involved in producing the elements of your iterator, then the elements that come out of your iterator are going to be Deferreds first.

You might do something like this to account for that:

@inlineCallbacks
def process_work():
    for element_deferred in some_jobs:
        element = yield element_deferred
        work_on(element)

This can work, but it looks particularly weird. Since generators can only yield to their caller (not, for example, to their caller's caller), the some_jobs iterator can't do anything about this; only code lexically within process_work can yield a Deferred to the inlineCallbacks-provided trampoline to wait on.

If you don't mind this pattern, then we could imaging your code being written something like:

from twisted.internet.task import deferLater
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor

class cacheiter(object):
    def __init__(self, cached):
        self._cached = iter(cached.items())
        self._remaining = []

    def __iter__(self):
        return self


    @inlineCallbacks
    def next(self):
        # First re-fill the list of synchronously-producable values if it is empty
        if not self._remaining:
            for name, value in self._cached:
                # Wait on this Deferred to determine if this cache item should be included
                if (yield check_condition(name, value)):
                    # If so, put all of its values into the value cache so the next one
                    # can be returned immediately next time this method is called.
                    self._remaining.extend([(name, k, v) for (k, v) in value.items()])

        # Now actually give out a value, if there is one.
        if self._remaining:
            returnValue(self._remaining.pop())

        # Otherwise the entire cache has been visited and the iterator is complete.
        # Sadly we cannot signal completion with StopIteration, because the iterator
        # protocol isn't going to add an errback to this Deferred and check for
        # StopIteration.  So signal completion with a simple None value.
        returnValue(None)


@inlineCallbacks
def process_chunk(myiter, num):
    for i in xrange(num):
        nextval = yield myiter.next()
        if nextval is None:
            # The iterator signaled completion via the special None value.
            # Processing is complete.
            returnValue(True)
        # Otherwise process the value.
        yield some_processing(nextval)

    # Indicate there is more processing to be done.
    returnValue(False)


def sleep(sec):
    # Simple helper to delay asynchronously for some number of seconds.
    return deferLater(reactor, sec, lambda: None)


@inlineCallbacks
def process_loop(cached):
    myiter = cacheiter(cached)
    while True:
        # Loop processing 10 items from myiter at a time, until process_chunk signals
        # there are no values left.
        result = yield process_chunk(myiter, 10)
        if result:
            print 'All done'
            break

        print 'More left'
        # Insert the 5 second delay before starting on the next chunk.
        yield sleep(5)

d = process_loop(cached)

Another approach you might be able to take, though, is to use twisted.internet.task.cooperate. cooperate takes an iterator and consumes it, assuming that consuming it is potentially costly, and splitting up the job over multiple reactor iterations. Taking the definition of cacheiter from above:

from twisted.internet.task import cooperate

def process_loop(cached):
    finished = []

    def process_one(value):
        if value is None:
            finished.append(True)
        else:
            return some_processing(value)

    myiter = cacheiter(cached)

    while not finished:
        value_deferred = myiter.next()
        value_deferred.addCallback(process_one)
        yield value_deferred

task = cooperate(process_loop(cached))
d = task.whenDone()
看透却不说透 2024-11-13 06:36:44

我认为你正在尝试这样做:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

当 genexp 无法表达你想要返回的内容时,你可以编写一个闭包:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it

I think you're trying to do this:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred() # some deferred you'd like evaluated
        if result is True:
            # here you want to return something, so you have to use returnValue
            # the generator you want to return can be written as a generator expression
            gen = ((cachename, k, v) for k,v in cachedvalue.items())
            returnValue(gen)

When a genexp can't express what you're trying to return you can write a closure:

@inlineCallbacks
def cacheiter(cached):
    for cachename,cachevalue in cached.items():
        result = yield some_deferred()
        if result is True:
            # define the generator, saving the current values of the cache
            def gen(cachedvalue=cachedvalue, cachename=cachename):
                for k,v in cachedvalue.items():
                    yield cachename, k, v
            returnValue(gen()) # return it
潇烟暮雨 2024-11-13 06:36:44

尝试将迭代器编写为 DeferredGenerator

Try writing your iterator as a DeferredGenerator.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文