有没有比标准“递归”更快的方法从 python 中的树状结构获取子树?

发布于 2024-09-11 16:48:09 字数 1035 浏览 9 评论 0原文

让我们假设以下数据结构具有三个 numpy 数组 (id,parent_id)(根元素的parent_id 为 -1):

import numpy as np
class MyStructure(object):
  def __init__(self):
    """
    Default structure for now:

          1
         / \
        2   3
           / \
          4   5
    """
    self.ids = np.array([1,2,3,4,5])
    self.parent_ids = np.array([-1, 1, 1, 3, 3])

  def id_successors(self, idOfInterest):
    """
    Return logical index.
    """
    return self.parent_ids == idOfInterest

  def subtree(self, newRootElement):
    """
    Return logical index pointing to elements of the subtree.
    """
    init_vector = np.zeros(len(self.ids), bool)
    init_vector[np.where(self.ids==newRootElement)[0]] = 1
    if sum(self.id_successors(newRootElement))==0:
      return init_vector
    else:
      subtree_vec = init_vector
      for sucs in self.ids[self.id_successors(newRootElement)==1]:
        subtree_vec += self.subtree(sucs)
      return subtree_vec

对于许多 ids (>1000) 来说,此获取非常慢。有没有更快的方法来实现?

Let's assume the following data structur with three numpy arrays (id, parent_id) (parent_id of the root element is -1):

import numpy as np
class MyStructure(object):
  def __init__(self):
    """
    Default structure for now:

          1
         / \
        2   3
           / \
          4   5
    """
    self.ids = np.array([1,2,3,4,5])
    self.parent_ids = np.array([-1, 1, 1, 3, 3])

  def id_successors(self, idOfInterest):
    """
    Return logical index.
    """
    return self.parent_ids == idOfInterest

  def subtree(self, newRootElement):
    """
    Return logical index pointing to elements of the subtree.
    """
    init_vector = np.zeros(len(self.ids), bool)
    init_vector[np.where(self.ids==newRootElement)[0]] = 1
    if sum(self.id_successors(newRootElement))==0:
      return init_vector
    else:
      subtree_vec = init_vector
      for sucs in self.ids[self.id_successors(newRootElement)==1]:
        subtree_vec += self.subtree(sucs)
      return subtree_vec

This get's really slow for many ids (>1000). Is there a faster way to implement that?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(4

野生奥特曼 2024-09-18 16:48:09

如果您使用的是 Python 2.6,您是否尝试过使用 psyco 模块?有时它可以显着提高代码速度。

您是否考虑过递归数据结构:列表?

您的示例也是标准列表:

[1,2, [3, [4],[5]]]

[1、[2、无、无]、[3、[4、无、无]、[5、无、无]]]

由我的 漂亮打印机

[1, 
  [2, None, None], 
  [3, 
    [4, None, None], 
    [5, None, None]]]

子树已准备就绪,需要花费一些时间将值插入到正确的树中。还值得检查是否 heapq 模块 适合您的需求。

Guido 本人也在 http://python.org/doc/essays 中给出了一些关于遍历和树的见解/graphs.html,也许你已经意识到了。

这里有一些看起来很高级的树的东西,实际上是为 Python 提议作为基本列表类型替换的,但在该函数中被拒绝了。 Blist 模块

Have you tried to use psyco module if you are using Python 2.6? It can sometimes do dramatic speed up of code.

Have you considered recursive data structure: list?

Your example is also as standard list:

[1, 2, [3, [4],[5]]]

or

[1, [2, None, None], [3, [4, None, None],[5, None, None]]]

By my pretty printer:

[1, 
  [2, None, None], 
  [3, 
    [4, None, None], 
    [5, None, None]]]

Subtrees are ready there, cost you some time inserting values to right tree. Also worth while to check if heapq module fits your needs.

Also Guido himself gives some insight on traversing and trees in http://python.org/doc/essays/graphs.html, maybe you are aware of it.

Here is some advanced looking tree stuff, actually proposed for Python as basic list type replacement, but rejected in that function. Blist module

放手` 2024-09-18 16:48:09

我认为这并不是递归本身对您造成伤害,而是每一步都有大量非常广泛的操作(针对所有元素)。考虑:

init_vector[np.where(self.ids==newRootElement)[0]] = 1

对所有元素进行扫描,计算每个匹配元素的索引,然后仅使用第一个元素的索引。这个特定的操作可用作列表、元组和数组的方法索引 - 并且速度更快。如果 ID 是唯一的,那么 init_vector 就是 ids==newRootElement 。

if sum(self.id_successors(newRootElement))==0:

再次对每个元素进行线性扫描,然后对整个数组进行缩减,只是为了检查是否存在任何匹配项。对于这种类型,请使用 any操作,但我们甚至不需要对所有元素进行检查 - “if newRootElement not in self.parent_ids”可以完成这项工作,但这不是必需的,因为对空列表执行 for 循环是完全有效的。

最后是最后一个循环:

for sucs in self.ids[self.id_successors(newRootElement)==1]:

这一次,重复 id_successors 调用,然后将结果与 1 进行不必要的比较。只有在此之后才会进行递归,确保每个分支重复所有上述操作(对于不同的 newRootElement)。

整个代码就是一个单向树的反向遍历。我们有父母,也需要孩子。如果我们要进行广泛的操作,例如 numpy 的设计目的,我们最好让它们计数 - 因此我们关心的唯一操作是为每个父母建立一个孩子列表。通过一次迭代来做到这一点并不难:

import collections
children=collections.defaultdict(list)
for i,p in zip(ids,parent_ids):
  children[p].append(i)

def subtree(i):
  return i, map(subtree, children[i])

您需要的确切结构将取决于更多因素,例如树更改的频率、树的大小、分支的数量以及您需要请求的子树的大小和数量。例如,上面的字典+列表结构的内存效率并不是很高。您的示例也已排序,这可以使操作更加容易。

I think it's not the recursion as such that's hurting you, but the multitude of very wide operations (over all elements) for every step. Consider:

init_vector[np.where(self.ids==newRootElement)[0]] = 1

That runs a scan through all elements, calculates the index of every matching element, then uses only the index of the first one. This particular operation is available as the method index for lists, tuples, and arrays - and faster there. If IDs are unique, init_vector is simply ids==newRootElement anyway.

if sum(self.id_successors(newRootElement))==0:

Again a linear scan of every element, then a reduction on the whole array, just to check if any matches are there. Use any for this type of operation, but once again we don't even need to do the check on all elements - "if newRootElement not in self.parent_ids" does the job, but it's not necessary as it's perfectly valid to do a for loop over an empty list.

Finally there's the last loop:

for sucs in self.ids[self.id_successors(newRootElement)==1]:

This time, an id_successors call is repeated, and then the result is compared to 1 needlessly. Only after that comes the recursion, making sure all the above operations are repeated (for different newRootElement) for each branch.

The whole code is a reversed traversal of a unidirectional tree. We have parents and need children. If we're to do wide operations such as numpy is designed for, we'd best make them count - and thus the only operation we care about is building a list of children per parent. That's not very hard to do with one iteration:

import collections
children=collections.defaultdict(list)
for i,p in zip(ids,parent_ids):
  children[p].append(i)

def subtree(i):
  return i, map(subtree, children[i])

The exact structure you need will depend on more factors, such as how often the tree changes, how large it is, how much it branches, and how large and many subtrees you need to request. The dictionary+list structure above isn't terribly memory efficient, for instance. Your example is also sorted, which could make the operation even easier.

我一向站在原地 2024-09-18 16:48:09

理论上,每个算法都可以迭代和递归方式编写。但这是一个谬误(就像图灵完备性一样)。在实践中,通过迭代遍历任意嵌套的树通常是不可行的。我怀疑还有很多需要优化的地方(至少你正在就地修改 subtree_vec )。对数千个元素执行 x 本质上是非常昂贵的,无论您是迭代还是递归执行。在具体实现上至多存在一些可能的微观优化,这最多将产生<5%的改进。如果您多次需要相同的数据,最好的选择是缓存/记忆。也许有人对你的特定树结构有一种奇特的 O(log n) 算法,我什至不知道是否可能(我假设不可能,但树操作不是我的工作人员)。

In theory, every algorithm can be written iteratively as well as recursively. But this is a fallacy (like Turing-completeness). In practice, walking an arbitrarily-nested tree via iteration is generally not feasible. I doubt there is much to optimize (at least you're modifying subtree_vec in-place). Doing x on thousands of elements is inherently damn expensive, no matter whether you do it iteratively or recursively. At most there are a few micro-optimizations possible on the concrete implementation, which will at most yield <5% improvement. Best bet would be caching/memoization, if you need the same data several times. Maybe someone has a fancy O(log n) algorithm for your specific tree structure up their sleeve, I don't even know if one is possible (I'd assume no, but tree manipulation isn't my staff of life).

装纯掩盖桑 2024-09-18 16:48:09

这是我的答案(在无法访问您的类的情况下编写,因此界面略有不同,但我按原样附加它,以便您可以测试它是否足够快):
======================文件graph_array.py======================== ===


import collections
import numpy

def find_subtree(pids, subtree_id):
    N = len(pids)
    assert 1 <= subtree_id <= N

    subtreeids = numpy.zeros(pids.shape, dtype=bool)
    todo = collections.deque([subtree_id])

    iter = 0
    while todo:
        id = todo.popleft()
        assert 1 <= id <= N
        subtreeids[id - 1] = True

        sons = (pids == id).nonzero()[0] + 1
        #print 'id={0} sons={1} todo={2}'.format(id, sons, todo)
        todo.extend(sons)

        iter = iter+1
        if iter>N:
            raise ValueError()

    return subtreeids

=========================文件graph_array_test.py====================== ======


import numpy
from graph_array import find_subtree

def _random_graph(n, maxsons):
    import random
    pids = numpy.zeros(n, dtype=int)
    sons = numpy.zeros(n, dtype=int)
    available = []
    for id in xrange(1, n+1):
        if available:
            pid = random.choice(available)

            sons[pid - 1] += 1
            if sons[pid - 1] == maxsons:
                available.remove(pid)
        else:
            pid = -1
        pids[id - 1] = pid
        available.append(id)
    assert sons.max() <= maxsons
    return pids

def verify_subtree(pids, subtree_id, subtree):
    ids = set(subtree.nonzero()[0] + 1)
    sons = set(ids) - set([subtree_id])
    fathers = set(pids[id - 1] for id in sons)
    leafs = set(id for id in ids if not (pids == id).any())
    rest = set(xrange(1, pids.size+1)) - fathers - leafs
    assert fathers & leafs == set()
    assert fathers | leafs == ids
    assert ids & rest == set()

def test_linear_graph_gen(n, genfunc, maxsons):
    assert maxsons == 1
    pids = genfunc(n, maxsons)

    last = -1
    seen = set()
    for _ in xrange(pids.size):
        id = int((pids == last).nonzero()[0]) + 1
        assert id not in seen
        seen.add(id)
        last = id
    assert seen == set(xrange(1, pids.size + 1))

def test_case1():
    """
            1
           / \
          2   4
         /
        3
    """
    pids = numpy.array([-1, 1, 2, 1])

    subtrees = {1: [True, True, True, True],
                2: [False, True, True, False],
                3: [False, False, True, False],
                4: [False, False, False, True]}

    for id in xrange(1, 5):
        sub = find_subtree(pids, id)
        assert (sub == numpy.array(subtrees[id])).all()
        verify_subtree(pids, id, sub)

def test_random(n, genfunc, maxsons):
    pids = genfunc(n, maxsons)
    for subtree_id in numpy.arange(1, n+1):
        subtree = find_subtree(pids, subtree_id)
        verify_subtree(pids, subtree_id, subtree)

def test_timing(n, genfunc, maxsons):
    import time
    pids = genfunc(n, maxsons)
    t = time.time()
    for subtree_id in numpy.arange(1, n+1):
        subtree = find_subtree(pids, subtree_id)
    t = time.time() - t
    print 't={0}s = {1:.2}ms/subtree = {2:.5}ms/subtree/node '.format(
        t, t / n * 1000, t / n**2 * 1000),

def pytest_generate_tests(metafunc):
    if 'case' in metafunc.function.__name__:
        return
    ns = [1, 2, 3, 4, 5, 10, 20, 50, 100, 1000]
    if 'timing' in metafunc.function.__name__:
        ns += [10000, 100000, 1000000]
        pass
    for n in ns:
        func = _random_graph
        for maxsons in sorted(set([1, 2, 3, 4, 5, 10, (n+1)//2, n])):
            metafunc.addcall(
                funcargs=dict(n=n, genfunc=func, maxsons=maxsons),
                id='n={0} {1.__name__}/{2}'.format(n, func, maxsons))
            if 'linear' in metafunc.function.__name__:
                break

===================py.test --tb=short -v -s test_graph_array.py========== ==

...
test_graph_array.py:72: test_timing[n=1000 _random_graph/1] t=13.4850590229s = 13.0ms/subtree = 0.013485ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/2] t=0.318281888962s = 0.32ms/subtree = 0.00031828ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/3] t=0.265519142151s = 0.27ms/subtree = 0.00026552ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/4] t=0.24147105217s = 0.24ms/subtree = 0.00024147ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/5] t=0.211434841156s = 0.21ms/subtree = 0.00021143ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/10] t=0.178458213806s = 0.18ms/subtree = 0.00017846ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/500] t=0.209936141968s = 0.21ms/subtree = 0.00020994ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/1000] t=0.245707988739s = 0.25ms/subtree = 0.00024571ms/subtree/node PASS
...


这里获取每棵树的每个子树,有趣的值是提取树的平均时间:每个子树约 0.2 毫秒,严格线性树除外。我不确定这里发生了什么。

This is my answer (written without access to your class, so the interface is slightly different, but I'm attaching it as is so that you can test if it is fast enough):
=======================file graph_array.py==========================


import collections
import numpy

def find_subtree(pids, subtree_id):
    N = len(pids)
    assert 1 <= subtree_id <= N

    subtreeids = numpy.zeros(pids.shape, dtype=bool)
    todo = collections.deque([subtree_id])

    iter = 0
    while todo:
        id = todo.popleft()
        assert 1 <= id <= N
        subtreeids[id - 1] = True

        sons = (pids == id).nonzero()[0] + 1
        #print 'id={0} sons={1} todo={2}'.format(id, sons, todo)
        todo.extend(sons)

        iter = iter+1
        if iter>N:
            raise ValueError()

    return subtreeids

=======================file graph_array_test.py==========================


import numpy
from graph_array import find_subtree

def _random_graph(n, maxsons):
    import random
    pids = numpy.zeros(n, dtype=int)
    sons = numpy.zeros(n, dtype=int)
    available = []
    for id in xrange(1, n+1):
        if available:
            pid = random.choice(available)

            sons[pid - 1] += 1
            if sons[pid - 1] == maxsons:
                available.remove(pid)
        else:
            pid = -1
        pids[id - 1] = pid
        available.append(id)
    assert sons.max() <= maxsons
    return pids

def verify_subtree(pids, subtree_id, subtree):
    ids = set(subtree.nonzero()[0] + 1)
    sons = set(ids) - set([subtree_id])
    fathers = set(pids[id - 1] for id in sons)
    leafs = set(id for id in ids if not (pids == id).any())
    rest = set(xrange(1, pids.size+1)) - fathers - leafs
    assert fathers & leafs == set()
    assert fathers | leafs == ids
    assert ids & rest == set()

def test_linear_graph_gen(n, genfunc, maxsons):
    assert maxsons == 1
    pids = genfunc(n, maxsons)

    last = -1
    seen = set()
    for _ in xrange(pids.size):
        id = int((pids == last).nonzero()[0]) + 1
        assert id not in seen
        seen.add(id)
        last = id
    assert seen == set(xrange(1, pids.size + 1))

def test_case1():
    """
            1
           / \
          2   4
         /
        3
    """
    pids = numpy.array([-1, 1, 2, 1])

    subtrees = {1: [True, True, True, True],
                2: [False, True, True, False],
                3: [False, False, True, False],
                4: [False, False, False, True]}

    for id in xrange(1, 5):
        sub = find_subtree(pids, id)
        assert (sub == numpy.array(subtrees[id])).all()
        verify_subtree(pids, id, sub)

def test_random(n, genfunc, maxsons):
    pids = genfunc(n, maxsons)
    for subtree_id in numpy.arange(1, n+1):
        subtree = find_subtree(pids, subtree_id)
        verify_subtree(pids, subtree_id, subtree)

def test_timing(n, genfunc, maxsons):
    import time
    pids = genfunc(n, maxsons)
    t = time.time()
    for subtree_id in numpy.arange(1, n+1):
        subtree = find_subtree(pids, subtree_id)
    t = time.time() - t
    print 't={0}s = {1:.2}ms/subtree = {2:.5}ms/subtree/node '.format(
        t, t / n * 1000, t / n**2 * 1000),

def pytest_generate_tests(metafunc):
    if 'case' in metafunc.function.__name__:
        return
    ns = [1, 2, 3, 4, 5, 10, 20, 50, 100, 1000]
    if 'timing' in metafunc.function.__name__:
        ns += [10000, 100000, 1000000]
        pass
    for n in ns:
        func = _random_graph
        for maxsons in sorted(set([1, 2, 3, 4, 5, 10, (n+1)//2, n])):
            metafunc.addcall(
                funcargs=dict(n=n, genfunc=func, maxsons=maxsons),
                id='n={0} {1.__name__}/{2}'.format(n, func, maxsons))
            if 'linear' in metafunc.function.__name__:
                break

===================py.test --tb=short -v -s test_graph_array.py============

...
test_graph_array.py:72: test_timing[n=1000 _random_graph/1] t=13.4850590229s = 13.0ms/subtree = 0.013485ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/2] t=0.318281888962s = 0.32ms/subtree = 0.00031828ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/3] t=0.265519142151s = 0.27ms/subtree = 0.00026552ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/4] t=0.24147105217s = 0.24ms/subtree = 0.00024147ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/5] t=0.211434841156s = 0.21ms/subtree = 0.00021143ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/10] t=0.178458213806s = 0.18ms/subtree = 0.00017846ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/500] t=0.209936141968s = 0.21ms/subtree = 0.00020994ms/subtree/node PASS
test_graph_array.py:72: test_timing[n=1000 _random_graph/1000] t=0.245707988739s = 0.25ms/subtree = 0.00024571ms/subtree/node PASS
...


Here every subtree of every tree is taken, and the interesting value is the mean time to extract a tree: ~0.2ms per subtree, except for strictly linear trees. I'm not sure what is happening here.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文