Python 中具有依赖关系的惰性数据流(类似电子表格)属性

发布于 2024-12-19 00:12:27 字数 1287 浏览 0 评论 0原文

我的问题如下:我有一些 python 类,它们具有从其他属性派生的属性;一旦计算完毕,就应该缓存它们,并且每次更改基本属性时,缓存的结果都应该失效。

我可以手动完成,但如果属性数量增加,维护似乎相当困难。因此,我希望在我的对象中添加类似 Makefile 规则的内容,以自动跟踪需要重新计算的内容。

所需的语法和行为应该是这样的:

# this does dirty magic, like generating the reverse dependency graph,
# and preparing the setters that invalidate the cached values
@dataflow_class
class Test(object):

    def calc_a(self):
        return self.b + self.c

    def calc_c(self):
        return self.d * 2

    a = managed_property(calculate=calc_a, depends_on=('b', 'c'))
    b = managed_property(default=0)
    c = managed_property(calculate=calc_c, depends_on=('d',))
    d = managed_property(default=0)


t = Test()

print t.a
# a has not been initialized, so it calls calc_a
# gets b value
# c has not been initialized, so it calls calc_c
# c value is calculated and stored in t.__c
# a value is calculated and stored in t.__a

t.b = 1
# invalidates the calculated value stored in self.__a

print t.a
# a has been invalidated, so it calls calc_a
# gets b value
# gets c value, from t.__c
# a value is calculated and stored in t.__a

print t.a
# gets value from t.__a

t.d = 2
# invalidates the calculated values stored in t.__a and t.__c

那么,是否有类似的东西已经可用,或者我应该开始实现自己的?对于第二种情况,欢迎提出建议:-)

My problem is the following: I have some python classes that have properties that are derived from other properties; and those should be cached once they are calculated, and the cached results should be invalidated each time the base properties are changed.

I could do it manually, but it seems quite difficult to maintain if the number of properties grows. So I would like to have something like Makefile rules inside my objects to automatically keep track of what needs to be recalculated.

The desired syntax and behaviour should be something like that:

# this does dirty magic, like generating the reverse dependency graph,
# and preparing the setters that invalidate the cached values
@dataflow_class
class Test(object):

    def calc_a(self):
        return self.b + self.c

    def calc_c(self):
        return self.d * 2

    a = managed_property(calculate=calc_a, depends_on=('b', 'c'))
    b = managed_property(default=0)
    c = managed_property(calculate=calc_c, depends_on=('d',))
    d = managed_property(default=0)


t = Test()

print t.a
# a has not been initialized, so it calls calc_a
# gets b value
# c has not been initialized, so it calls calc_c
# c value is calculated and stored in t.__c
# a value is calculated and stored in t.__a

t.b = 1
# invalidates the calculated value stored in self.__a

print t.a
# a has been invalidated, so it calls calc_a
# gets b value
# gets c value, from t.__c
# a value is calculated and stored in t.__a

print t.a
# gets value from t.__a

t.d = 2
# invalidates the calculated values stored in t.__a and t.__c

So, is there something like this already available or should I start implementing my own? In the second case, suggestions are welcome :-)

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

川水往事 2024-12-26 00:12:27

在这里,这应该可以解决问题。
描述符机制(语言通过它实现“属性”)是
足够你想要的了。

如果下面的代码在某些极端情况下不起作用,请写信给我。

class DependentProperty(object):
    def __init__(self, calculate=None, default=None, depends_on=()):
        # "name" and "dependence_tree" properties are attributes
        # set up by the metaclass of the owner class
        if calculate:
            self.calculate = calculate
        else:
            self.default = default
        self.depends_on = set(depends_on)

    def __get__(self, instance, owner):
        if hasattr(self, "default"):
            return self.default
        if not hasattr(instance, "_" + self.name):
            setattr(instance, "_" + self.name,
                self.calculate(instance, getattr(instance, "_" + self.name + "_last_value")))
        return getattr(instance, "_" + self.name)

    def __set__(self, instance, value):
        setattr(instance, "_" + self.name + "_last_value", value)
        setattr(instance, "_" + self.name, self.calculate(instance, value))
        for attr in self.dependence_tree[self.name]:
            delattr(instance, attr)

    def __delete__(self, instance):
        try:
            delattr(instance, "_" + self.name)
        except AttributeError:
            pass


def assemble_tree(name,  dict_, all_deps = None):
    if all_deps is None:
        all_deps = set()
    for dependance in dict_[name].depends_on:
        all_deps.add(dependance)
        assemble_tree(dependance, dict_, all_deps)
    return all_deps

def invert_tree(tree):
    new_tree = {}
    for key, val in tree.items():
        for dependence in val:
            if dependence not in new_tree:
                new_tree[dependence] = set()
            new_tree[dependence].add(key)
    return new_tree

class DependenceMeta(type):
    def __new__(cls, name, bases, dict_):
        dependence_tree = {}
        properties = []
        for key, val in dict_.items():
            if not isinstance(val, DependentProperty):
                continue
            val.name = key
            val.dependence_tree = dependence_tree
            dependence_tree[key] = set()
            properties.append(val)
        inverted_tree = {}
        for property in properties:
            inverted_tree[property.name] = assemble_tree(property.name, dict_)
        dependence_tree.update(invert_tree(inverted_tree))
        return type.__new__(cls, name, bases, dict_)


if __name__ == "__main__":
    # Example and visual test:

    class Bla:
        __metaclass__ = DependenceMeta

        def calc_b(self, x):
            print "Calculating b"
            return x + self.a

        def calc_c(self, x):
            print "Calculating c"
            return x + self.b

        a = DependentProperty(default=10)    
        b = DependentProperty(depends_on=("a",), calculate=calc_b)
        c = DependentProperty(depends_on=("b",), calculate=calc_c)




    bla = Bla()
    bla.b = 5
    bla.c = 10

    print bla.a, bla.b, bla.c
    bla.b = 10
    print bla.b
    print bla.c

Here, this should do the trick.
The descriptor mechanism (through which the language implements "property") is
more than enough for what you want.

If the code bellow does not work in some corner cases, just write me.

class DependentProperty(object):
    def __init__(self, calculate=None, default=None, depends_on=()):
        # "name" and "dependence_tree" properties are attributes
        # set up by the metaclass of the owner class
        if calculate:
            self.calculate = calculate
        else:
            self.default = default
        self.depends_on = set(depends_on)

    def __get__(self, instance, owner):
        if hasattr(self, "default"):
            return self.default
        if not hasattr(instance, "_" + self.name):
            setattr(instance, "_" + self.name,
                self.calculate(instance, getattr(instance, "_" + self.name + "_last_value")))
        return getattr(instance, "_" + self.name)

    def __set__(self, instance, value):
        setattr(instance, "_" + self.name + "_last_value", value)
        setattr(instance, "_" + self.name, self.calculate(instance, value))
        for attr in self.dependence_tree[self.name]:
            delattr(instance, attr)

    def __delete__(self, instance):
        try:
            delattr(instance, "_" + self.name)
        except AttributeError:
            pass


def assemble_tree(name,  dict_, all_deps = None):
    if all_deps is None:
        all_deps = set()
    for dependance in dict_[name].depends_on:
        all_deps.add(dependance)
        assemble_tree(dependance, dict_, all_deps)
    return all_deps

def invert_tree(tree):
    new_tree = {}
    for key, val in tree.items():
        for dependence in val:
            if dependence not in new_tree:
                new_tree[dependence] = set()
            new_tree[dependence].add(key)
    return new_tree

class DependenceMeta(type):
    def __new__(cls, name, bases, dict_):
        dependence_tree = {}
        properties = []
        for key, val in dict_.items():
            if not isinstance(val, DependentProperty):
                continue
            val.name = key
            val.dependence_tree = dependence_tree
            dependence_tree[key] = set()
            properties.append(val)
        inverted_tree = {}
        for property in properties:
            inverted_tree[property.name] = assemble_tree(property.name, dict_)
        dependence_tree.update(invert_tree(inverted_tree))
        return type.__new__(cls, name, bases, dict_)


if __name__ == "__main__":
    # Example and visual test:

    class Bla:
        __metaclass__ = DependenceMeta

        def calc_b(self, x):
            print "Calculating b"
            return x + self.a

        def calc_c(self, x):
            print "Calculating c"
            return x + self.b

        a = DependentProperty(default=10)    
        b = DependentProperty(depends_on=("a",), calculate=calc_b)
        c = DependentProperty(depends_on=("b",), calculate=calc_c)




    bla = Bla()
    bla.b = 5
    bla.c = 10

    print bla.a, bla.b, bla.c
    bla.b = 10
    print bla.b
    print bla.c
爺獨霸怡葒院 2024-12-26 00:12:27

我想要一些类似于 Makefile 规则的东西

,然后使用一个!你可以考虑这样的模型:

  • 一条规则 = 一个 python 文件
  • 一个结果 = 一个 *.data 文件
  • 管道被实现为 makefile 或使用其他依赖分析工具(cmake、scons)

我们公司的硬件测试团队使用这样的框架密集的探索性测试:

  • 您可以轻松集成其他语言和工具
  • 您可以获得稳定且经过验证的解决方案
  • 计算可以分布在多个CPU/计算机上
  • 您跟踪对值规则的依赖
  • 关系中间值的调试很容易

(大)这种方法的缺点是您必须放弃 python import 关键字,因为它会创建隐式(且未跟踪)依赖项(对此有解决方法)。

I would like to have something like Makefile rules

then use one! You may consider this model:

  • one rule = one python file
  • one result = one *.data file
  • the pipe is implemented as a makefile or with another dependency analysis tool (cmake, scons)

The hardware test team in our company use such a framework for intensive exploratory tests:

  • you can integrate other languages and tools easily
  • you get a stable and proven solution
  • computations may be distributed one multiple cpu/computers
  • you track dependencies on values and rules
  • debug of intermediate values is easy

the (big) downside to this method is that you have to give up python import keyword because it creates an implicit (and untracked) dependency (there are workarounds for this).

驱逐舰岛风号 2024-12-26 00:12:27
import collections

sentinel=object()

class ManagedProperty(object):
    '''
    If deptree = {'a':set('b','c')}, then ManagedProperties `b` and
    `c` will be reset whenever `a` is modified.
    '''
    def __init__(self,property_name,calculate=None,depends_on=tuple(),
                 default=sentinel):
        self.property_name=property_name
        self.private_name='_'+property_name 
        self.calculate=calculate
        self.depends_on=depends_on
        self.default=default
    def __get__(self,obj,objtype):
        if obj is None:
            # Allows getattr(cls,mprop) to return the ManagedProperty instance
            return self
        try:
            return getattr(obj,self.private_name)
        except AttributeError:
            result=(getattr(obj,self.calculate)()
                    if self.default is sentinel else self.default)
            setattr(obj,self.private_name,result)
            return result
    def __set__(self,obj,value):
        # obj._dependencies is defined by @register
        map(obj.__delattr__,getattr(obj,'_dependencies').get(self.property_name,tuple()))
        setattr(obj,self.private_name,value)        
    def __delete__(self,obj):
        if hasattr(obj,self.private_name):
            delattr(obj,self.private_name)

def register(*mproperties):
    def flatten_dependencies(name, deptree, all_deps=None):
        '''
        A deptree such as {'c': set(['a']), 'd': set(['c'])} means
        'a' depends on 'c' and 'c' depends on 'd'.

        Given such a deptree, flatten_dependencies('d', deptree) returns the set
        of all property_names that depend on 'd' (i.e. set(['a','c']) in the
        above case).
        '''
        if all_deps is None:
            all_deps = set()
        for dep in deptree.get(name,tuple()):
            all_deps.add(dep)
            flatten_dependencies(dep, deptree, all_deps)
        return all_deps

    def classdecorator(cls):
        deptree=collections.defaultdict(set)
        for mprop in mproperties:
            setattr(cls,mprop.property_name,mprop)
        # Find all ManagedProperties in dir(cls). Note that some of these may be
        # inherited from bases of cls; they may not be listed in mproperties.
        # Doing it this way allows ManagedProperties to be overridden by subclasses.
        for propname in dir(cls):
            mprop=getattr(cls,propname)
            if not isinstance(mprop,ManagedProperty):
                continue
            for underlying_prop in mprop.depends_on:
                deptree[underlying_prop].add(mprop.property_name)

        # Flatten the dependency tree so no recursion is necessary. If one were
        # to use recursion instead, then a naive algorithm would make duplicate
        # calls to __delete__. By flattening the tree, there are no duplicate
        # calls to __delete__.
        dependencies={key:flatten_dependencies(key,deptree)
                      for key in deptree.keys()}
        setattr(cls,'_dependencies',dependencies)
        return cls
    return classdecorator

这些是我用来验证其行为的单元测试。

if __name__ == "__main__":
    import unittest
    import sys
    def count(meth):
        def wrapper(self,*args):
            countname=meth.func_name+'_count'
            setattr(self,countname,getattr(self,countname,0)+1)
            return meth(self,*args)
        return wrapper

    class Test(unittest.TestCase):
        def setUp(self):
            @register(
                ManagedProperty('d',default=0),
                ManagedProperty('b',default=0),
                ManagedProperty('c',calculate='calc_c',depends_on=('d',)),
                ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Foo(object):
                @count
                def calc_a(self):
                    return self.b + self.c
                @count
                def calc_c(self):
                    return self.d * 2
            @register(ManagedProperty('c',calculate='calc_c',depends_on=('b',)),
                      ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Bar(Foo):
                @count
                def calc_c(self):
                    return self.b * 3
            self.Foo=Foo
            self.Bar=Bar
            self.foo=Foo()
            self.foo2=Foo()            
            self.bar=Bar()

        def test_two_instances(self):
            self.foo.b = 1
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.assertEqual(self.foo2.a,0)
            self.assertEqual(self.foo2.b,0)
            self.assertEqual(self.foo2.c,0)
            self.assertEqual(self.foo2.d,0)


        def test_initialization(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)            
            self.assertEqual(self.foo.b,0)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)
            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.d,0)

        def test_dependence(self):
            self.assertEqual(self.Foo._dependencies,
                             {'c': set(['a']), 'b': set(['a']), 'd': set(['a', 'c'])})

            self.assertEqual(self.Bar._dependencies,
                             {'c': set(['a']), 'b': set(['a', 'c'])})

        def test_setting_property_updates_dependent(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)

            self.foo.b = 1
            # invalidates the calculated value stored in foo.a
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.calc_a_count,2)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.foo.d = 2
            # invalidates the calculated values stored in foo.a and foo.c
            self.assertEqual(self.foo.a,5)
            self.assertEqual(self.foo.calc_a_count,3)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,4)
            self.assertEqual(self.foo.d,2)

            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.calc_a_count,1)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.calc_c_count,1)
            self.assertEqual(self.bar.d,0)

            self.bar.b = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,0)

            self.bar.d = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)            
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,2)

    sys.argv.insert(1,'--verbose')
    unittest.main(argv=sys.argv)
import collections

sentinel=object()

class ManagedProperty(object):
    '''
    If deptree = {'a':set('b','c')}, then ManagedProperties `b` and
    `c` will be reset whenever `a` is modified.
    '''
    def __init__(self,property_name,calculate=None,depends_on=tuple(),
                 default=sentinel):
        self.property_name=property_name
        self.private_name='_'+property_name 
        self.calculate=calculate
        self.depends_on=depends_on
        self.default=default
    def __get__(self,obj,objtype):
        if obj is None:
            # Allows getattr(cls,mprop) to return the ManagedProperty instance
            return self
        try:
            return getattr(obj,self.private_name)
        except AttributeError:
            result=(getattr(obj,self.calculate)()
                    if self.default is sentinel else self.default)
            setattr(obj,self.private_name,result)
            return result
    def __set__(self,obj,value):
        # obj._dependencies is defined by @register
        map(obj.__delattr__,getattr(obj,'_dependencies').get(self.property_name,tuple()))
        setattr(obj,self.private_name,value)        
    def __delete__(self,obj):
        if hasattr(obj,self.private_name):
            delattr(obj,self.private_name)

def register(*mproperties):
    def flatten_dependencies(name, deptree, all_deps=None):
        '''
        A deptree such as {'c': set(['a']), 'd': set(['c'])} means
        'a' depends on 'c' and 'c' depends on 'd'.

        Given such a deptree, flatten_dependencies('d', deptree) returns the set
        of all property_names that depend on 'd' (i.e. set(['a','c']) in the
        above case).
        '''
        if all_deps is None:
            all_deps = set()
        for dep in deptree.get(name,tuple()):
            all_deps.add(dep)
            flatten_dependencies(dep, deptree, all_deps)
        return all_deps

    def classdecorator(cls):
        deptree=collections.defaultdict(set)
        for mprop in mproperties:
            setattr(cls,mprop.property_name,mprop)
        # Find all ManagedProperties in dir(cls). Note that some of these may be
        # inherited from bases of cls; they may not be listed in mproperties.
        # Doing it this way allows ManagedProperties to be overridden by subclasses.
        for propname in dir(cls):
            mprop=getattr(cls,propname)
            if not isinstance(mprop,ManagedProperty):
                continue
            for underlying_prop in mprop.depends_on:
                deptree[underlying_prop].add(mprop.property_name)

        # Flatten the dependency tree so no recursion is necessary. If one were
        # to use recursion instead, then a naive algorithm would make duplicate
        # calls to __delete__. By flattening the tree, there are no duplicate
        # calls to __delete__.
        dependencies={key:flatten_dependencies(key,deptree)
                      for key in deptree.keys()}
        setattr(cls,'_dependencies',dependencies)
        return cls
    return classdecorator

These are the unit tests I used to verify its behavior.

if __name__ == "__main__":
    import unittest
    import sys
    def count(meth):
        def wrapper(self,*args):
            countname=meth.func_name+'_count'
            setattr(self,countname,getattr(self,countname,0)+1)
            return meth(self,*args)
        return wrapper

    class Test(unittest.TestCase):
        def setUp(self):
            @register(
                ManagedProperty('d',default=0),
                ManagedProperty('b',default=0),
                ManagedProperty('c',calculate='calc_c',depends_on=('d',)),
                ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Foo(object):
                @count
                def calc_a(self):
                    return self.b + self.c
                @count
                def calc_c(self):
                    return self.d * 2
            @register(ManagedProperty('c',calculate='calc_c',depends_on=('b',)),
                      ManagedProperty('a',calculate='calc_a',depends_on=('b','c')))
            class Bar(Foo):
                @count
                def calc_c(self):
                    return self.b * 3
            self.Foo=Foo
            self.Bar=Bar
            self.foo=Foo()
            self.foo2=Foo()            
            self.bar=Bar()

        def test_two_instances(self):
            self.foo.b = 1
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.assertEqual(self.foo2.a,0)
            self.assertEqual(self.foo2.b,0)
            self.assertEqual(self.foo2.c,0)
            self.assertEqual(self.foo2.d,0)


        def test_initialization(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)            
            self.assertEqual(self.foo.b,0)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)
            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.d,0)

        def test_dependence(self):
            self.assertEqual(self.Foo._dependencies,
                             {'c': set(['a']), 'b': set(['a']), 'd': set(['a', 'c'])})

            self.assertEqual(self.Bar._dependencies,
                             {'c': set(['a']), 'b': set(['a', 'c'])})

        def test_setting_property_updates_dependent(self):
            self.assertEqual(self.foo.a,0)
            self.assertEqual(self.foo.calc_a_count,1)

            self.foo.b = 1
            # invalidates the calculated value stored in foo.a
            self.assertEqual(self.foo.a,1)
            self.assertEqual(self.foo.calc_a_count,2)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,0)
            self.assertEqual(self.foo.d,0)

            self.foo.d = 2
            # invalidates the calculated values stored in foo.a and foo.c
            self.assertEqual(self.foo.a,5)
            self.assertEqual(self.foo.calc_a_count,3)
            self.assertEqual(self.foo.b,1)
            self.assertEqual(self.foo.c,4)
            self.assertEqual(self.foo.d,2)

            self.assertEqual(self.bar.a,0)
            self.assertEqual(self.bar.calc_a_count,1)
            self.assertEqual(self.bar.b,0)
            self.assertEqual(self.bar.c,0)
            self.assertEqual(self.bar.calc_c_count,1)
            self.assertEqual(self.bar.d,0)

            self.bar.b = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,0)

            self.bar.d = 2
            self.assertEqual(self.bar.a,8)
            self.assertEqual(self.bar.calc_a_count,2)            
            self.assertEqual(self.bar.b,2)
            self.assertEqual(self.bar.c,6)
            self.assertEqual(self.bar.calc_c_count,2)
            self.assertEqual(self.bar.d,2)

    sys.argv.insert(1,'--verbose')
    unittest.main(argv=sys.argv)
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文