Python 中的概率分布

发布于 2024-07-13 04:37:21 字数 7105 浏览 4 评论 0原文

我有一堆键,每个键都有一个不太可能的变量。 我想随机选择这些键之一,但我希望不太可能的(键、值)比不太可能(更有可能)的对象更不可能被选择。 我想知道你是否有任何建议,最好是我可以使用的现有 python 模块,否则我需要自己制作。

我已经检查了随机模块; 它似乎没有提供这一点。

我必须对 1000 个不同的对象集做出数百万次这样的选择,每个对象集包含 2,455 个对象。 每个集合都会相互交换对象,因此随机选择器需要是动态的。 1000组2,433个对象,即24.33亿个对象; 低内存消耗至关重要。 由于这些选择并不是算法的大部分,所以我需要这个过程非常快; CPU 时间是有限的。

谢谢

更新:

好的,我试图明智地考虑你的建议,但时间如此有限......

我查看了二叉搜索树方法,它似乎太冒险了(复杂且复杂)。 其他建议都类似于 ActiveState 配方。 我对它进行了一些修改,希望能够提高效率:

def windex(dict, sum, max):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    n = random.uniform(0, 1)
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            break
        n = n - weight
    return key

我希望通过动态维护确定性和最大确定性的总和来获得效率增益。 欢迎任何进一步的建议。 你们为我节省了很多时间和精力,同时提高了我的效率,这太疯狂了。 谢谢! 谢谢! 谢谢!

更新2:

我决定通过让它一次选择更多的选择来提高它的效率。 这将导致我的算法的精度出现可接受的损失,因为它本质上是动态的。 不管怎样,这就是我现在所拥有的:

def weightedChoices(dict, sum, max, choices=10):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    list = [random.uniform(0, 1) for i in range(choices)]
    (n, list) = relavate(list.sort())
    keys = []
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            keys.append(key)
            if list: (n, list) = relavate(list)
            else: break
        n = n - weight
    return keys
def relavate(list):
    min = list[0]
    new = [l - min for l in list[1:]]
    return (min, new)

我还没有尝试过。 如果您有任何意见/建议,请不要犹豫。 谢谢!

更新3:

我一整天都在研究雷克斯·洛根答案的任务定制版本。 它实际上是一个特殊的字典类,而不是 2 个对象和权重数组; 这使得事情变得非常复杂,因为雷克斯的代码生成了一个随机索引......我还编写了一个测试用例,类似于我的算法中会发生的情况(但在我尝试之前我无法真正知道!)。 基本原则是:密钥随机生成的次数越多,再次生成的可能性就越小:

import random, time
import psyco
psyco.full()

class ProbDict():
    """
    Modified version of Rex Logans RandomObject class. The more a key is randomly
    chosen, the more unlikely it will further be randomly chosen. 
    """
    def __init__(self,keys_weights_values={}):
        self._kw=keys_weights_values
        self._keys=self._kw.keys()
        self._len=len(self._keys)
        self._findSeniors()
        self._effort = 0.15
        self._fails = 0
    def __iter__(self):
        return self.next()
    def __getitem__(self, key):
        return self._kw[key]
    def __setitem__(self, key, value):
        self.append(key, value)
    def __len__(self):
        return self._len
    def next(self):
        key=self._key()
        while key:
            yield key
            key = self._key()
    def __contains__(self, key):
        return key in self._kw
    def items(self):
        return self._kw.items()
    def pop(self, key):  
        try:
            (w, value) = self._kw.pop(key)
            self._len -=1
            if w == self._seniorW:
                self._seniors -= 1
                if not self._seniors:
                    #costly but unlikely:
                    self._findSeniors()
            return [w, value]
        except KeyError:
            return None
    def popitem(self):
        return self.pop(self._key())
    def values(self):
        values = []
        for key in self._keys:
            try:
                values.append(self._kw[key][1])
            except KeyError:
                pass
        return values
    def weights(self):
        weights = []
        for key in self._keys:
            try:
                weights.append(self._kw[key][0])
            except KeyError:
                pass
        return weights
    def keys(self, imperfect=False):
        if imperfect: return self._keys
        return self._kw.keys()
    def append(self, key, value=None):
        if key not in self._kw:
            self._len +=1
            self._kw[key] = [0, value]
            self._keys.append(key)
        else:
            self._kw[key][1]=value
    def _key(self):
        for i in range(int(self._effort*self._len)):
            ri=random.randint(0,self._len-1) #choose a random object
            rx=random.uniform(0,self._seniorW)
            rkey = self._keys[ri]
            try:
                w = self._kw[rkey][0]
                if rx >= w: # test to see if that is the value we want
                    w += 1
                    self._warnSeniors(w)
                    self._kw[rkey][0] = w
                    return rkey
            except KeyError:
                self._keys.pop(ri)
        # if you do not find one after 100 tries then just get a random one
        self._fails += 1 #for confirming effectiveness only
        for key in self._keys:
            if key in self._kw:
                w = self._kw[key][0] + 1
                self._warnSeniors(w)
                self._kw[key][0] = w
                return key
        return None
    def _findSeniors(self):
        '''this function finds the seniors, counts them and assess their age. It
        is costly but unlikely.'''
        seniorW = 0
        seniors = 0
        for w in self._kw.itervalues():
            if w >= seniorW:
                if w == seniorW:
                    seniors += 1
                else:
                    seniorsW = w
                    seniors = 1
        self._seniors = seniors
        self._seniorW = seniorW
    def _warnSeniors(self, w):
        #a weight can only be incremented...good
        if w >= self._seniorW:
            if w == self._seniorW:
                self._seniors+=1
            else:
                self._seniors = 1
                self._seniorW = w
def test():
    #test code
    iterations = 200000
    size = 2500
    nextkey = size 


    pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)]))
    start = time.clock()
    for i in xrange(iterations):
        key=pd._key()
        w=pd[key][0]
        if random.randint(0,1+pd._seniorW-w):
            #the heavier the object, the more unlikely it will be removed
            pd.pop(key)
        probAppend = float(500+(size-len(pd)))/1000
        if random.uniform(0,1) < probAppend:
            nextkey+=1
            pd.append(nextkey)
    print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations"
    weights = pd.weights()
    weights.sort()
    print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights)
    print weights
test()

仍然欢迎任何评论。 @Darius:你的二叉树对我来说太复杂了; 我不认为它的叶子可以有效地去除......谢谢大家

I have a bunch of keys that each have an unlikeliness variable. I want to randomly choose one of these keys, yet I want it to be more unlikely for unlikely (key, values) to be chosen than a less unlikely (a more likely) object. I am wondering if you would have any suggestions, preferably an existing python module that I could use, else I will need to make it myself.

I have checked out the random module; it does not seem to provide this.

I have to make such choices many millions of times for 1000 different sets of objects each containing 2,455 objects. Each set will exchange objects among each other so the random chooser needs to be dynamic. With 1000 sets of 2,433 objects, that is 2,433 million objects; low memory consumption is crucial. And since these choices are not the bulk of the algorithm, I need this process to be quite fast; CPU-time is limited.

Thx

Update:

Ok, I tried to consider your suggestions wisely, but time is so limited...

I looked at the binary search tree approach and it seems too risky (complex and complicated). The other suggestions all resemble the ActiveState recipe. I took it and modified it a little in the hope of making more efficient:

def windex(dict, sum, max):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    n = random.uniform(0, 1)
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            break
        n = n - weight
    return key

I am hoping to get an efficiency gain from dynamically maintaining the sum of certainties and the maximum certainty. Any further suggestions are welcome. You guys saves me so much time and effort, while increasing my effectiveness, it is crazy. Thx! Thx! Thx!

Update2:

I decided to make it more efficient by letting it choose more choices at once. This will result in an acceptable loss in precision in my algo for it is dynamic in nature. Anyway, here is what I have now:

def weightedChoices(dict, sum, max, choices=10):
    '''an attempt to make a random.choose() function that makes
    weighted choices accepts a dictionary with the item_key and
    certainty_value as a pair like:
    >>> x = [('one', 20), ('two', 2), ('three', 50)], the
    maximum certainty value (max) and the sum of all certainties.'''
    list = [random.uniform(0, 1) for i in range(choices)]
    (n, list) = relavate(list.sort())
    keys = []
    sum = max*len(list)-sum 
    for key, certainty in dict.iteritems():
        weight = float(max-certainty)/sum
        if n < weight:
            keys.append(key)
            if list: (n, list) = relavate(list)
            else: break
        n = n - weight
    return keys
def relavate(list):
    min = list[0]
    new = [l - min for l in list[1:]]
    return (min, new)

I haven't tried it out yet. If you have any comments/suggestions, please do not hesitate. Thx!

Update3:

I have been working all day on a task-tailored version of Rex Logan's answer. Instead of a 2 arrays of objects and weights, it is actually a special dictionary class; which makes things quite complex since Rex's code generates a random index... I also coded a test case that kind of resembles what will happen in my algo (but I can't really know until I try!). The basic principle is: the more a key is randomly generated often, the more unlikely it will be generated again:

import random, time
import psyco
psyco.full()

class ProbDict():
    """
    Modified version of Rex Logans RandomObject class. The more a key is randomly
    chosen, the more unlikely it will further be randomly chosen. 
    """
    def __init__(self,keys_weights_values={}):
        self._kw=keys_weights_values
        self._keys=self._kw.keys()
        self._len=len(self._keys)
        self._findSeniors()
        self._effort = 0.15
        self._fails = 0
    def __iter__(self):
        return self.next()
    def __getitem__(self, key):
        return self._kw[key]
    def __setitem__(self, key, value):
        self.append(key, value)
    def __len__(self):
        return self._len
    def next(self):
        key=self._key()
        while key:
            yield key
            key = self._key()
    def __contains__(self, key):
        return key in self._kw
    def items(self):
        return self._kw.items()
    def pop(self, key):  
        try:
            (w, value) = self._kw.pop(key)
            self._len -=1
            if w == self._seniorW:
                self._seniors -= 1
                if not self._seniors:
                    #costly but unlikely:
                    self._findSeniors()
            return [w, value]
        except KeyError:
            return None
    def popitem(self):
        return self.pop(self._key())
    def values(self):
        values = []
        for key in self._keys:
            try:
                values.append(self._kw[key][1])
            except KeyError:
                pass
        return values
    def weights(self):
        weights = []
        for key in self._keys:
            try:
                weights.append(self._kw[key][0])
            except KeyError:
                pass
        return weights
    def keys(self, imperfect=False):
        if imperfect: return self._keys
        return self._kw.keys()
    def append(self, key, value=None):
        if key not in self._kw:
            self._len +=1
            self._kw[key] = [0, value]
            self._keys.append(key)
        else:
            self._kw[key][1]=value
    def _key(self):
        for i in range(int(self._effort*self._len)):
            ri=random.randint(0,self._len-1) #choose a random object
            rx=random.uniform(0,self._seniorW)
            rkey = self._keys[ri]
            try:
                w = self._kw[rkey][0]
                if rx >= w: # test to see if that is the value we want
                    w += 1
                    self._warnSeniors(w)
                    self._kw[rkey][0] = w
                    return rkey
            except KeyError:
                self._keys.pop(ri)
        # if you do not find one after 100 tries then just get a random one
        self._fails += 1 #for confirming effectiveness only
        for key in self._keys:
            if key in self._kw:
                w = self._kw[key][0] + 1
                self._warnSeniors(w)
                self._kw[key][0] = w
                return key
        return None
    def _findSeniors(self):
        '''this function finds the seniors, counts them and assess their age. It
        is costly but unlikely.'''
        seniorW = 0
        seniors = 0
        for w in self._kw.itervalues():
            if w >= seniorW:
                if w == seniorW:
                    seniors += 1
                else:
                    seniorsW = w
                    seniors = 1
        self._seniors = seniors
        self._seniorW = seniorW
    def _warnSeniors(self, w):
        #a weight can only be incremented...good
        if w >= self._seniorW:
            if w == self._seniorW:
                self._seniors+=1
            else:
                self._seniors = 1
                self._seniorW = w
def test():
    #test code
    iterations = 200000
    size = 2500
    nextkey = size 


    pd = ProbDict(dict([(i,[0,i]) for i in xrange(size)]))
    start = time.clock()
    for i in xrange(iterations):
        key=pd._key()
        w=pd[key][0]
        if random.randint(0,1+pd._seniorW-w):
            #the heavier the object, the more unlikely it will be removed
            pd.pop(key)
        probAppend = float(500+(size-len(pd)))/1000
        if random.uniform(0,1) < probAppend:
            nextkey+=1
            pd.append(nextkey)
    print (time.clock()-start)*1000/iterations, "msecs / iteration with", pd._fails, "failures /", iterations, "iterations"
    weights = pd.weights()
    weights.sort()
    print "avg weight:", float(sum(weights))/pd._len, max(weights), pd._seniorW, pd._seniors, len(pd), len(weights)
    print weights
test()

Any comments are still welcome. @Darius: your binary trees are too complex and complicated for me; and I do not think its leafs can be removed efficiently... Thx all

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(12

柠檬色的秋千 2024-07-20 04:37:21

这个 activestate 配方 提供了一种易于遵循的方法,特别是评论中的版本不需要您预先标准化您的权重:

import random

def weighted_choice(items):
    """items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    n = random.uniform(0, weight_total)
    for item, weight in items:
        if n < weight:
            return item
        n = n - weight
    return item

如果您有大量的项目,这会很慢。 在这种情况下,二分搜索可能会更好......但编写起来也会更复杂,如果样本量较小,则收益甚微。 如果您想遵循该路线,这里是 python 中的二分搜索方法的示例

(我建议对数据集上的两种方法进行一些快速性能测试。此类算法的不同方法的性能通常有点不直观。)


编辑:我采纳了自己的建议,因为我很好奇,就做了一些测试。

我比较了四种方法:

上面的weighted_choice函数。

像这样的二分搜索选择函数:

def weighted_choice_bisect(items):
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    return items[bisect.bisect(added_weights, random.random() * last_sum)][0]

1的编译版本:

def weighted_choice_compile(items):
    """returns a function that fetches a random item from items

    items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    def choice(uniform = random.uniform):
        n = uniform(0, weight_total)
        for item, weight in items:
            if n < weight:
                return item
            n = n - weight
        return item
    return choice

A编译版本 2:

def weighted_choice_bisect_compile(items):
    """Returns a function that makes a weighted random choice from items."""
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    def choice(rnd=random.random, bis=bisect.bisect):
        return items[bis(added_weights, rnd() * last_sum)][0]
    return choice

然后,我构建了一个大的选择列表,如下所示:

choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]

以及一个过于简单的分析函数:

def profiler(f, n, *args, **kwargs):
    start = time.time()
    for i in xrange(n):
        f(*args, **kwargs)
    return time.time() - start

结果:

(对该函数进行 1,000 次调用所花费的秒数。)

  • 简单的未编译:0.918624162674
  • 二进制未编译:1.01497793198
  • 简单编译:0.287325024605
  • 二进制编译:0.00327413797379

“编译”结果包括编译一次选择函数所需的平均时间。 (我对 1,000 次编译进行了计时,然后将该时间除以 1,000,并将结果添加到选择函数时间中。)

因此:如果您有一个很少更改的项目+权重列表,则二进制编译方法是迄今为止的 最快。

This activestate recipe gives an easy-to-follow approach, specifically the version in the comments that doesn't require you to pre-normalize your weights:

import random

def weighted_choice(items):
    """items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    n = random.uniform(0, weight_total)
    for item, weight in items:
        if n < weight:
            return item
        n = n - weight
    return item

This will be slow if you have a large list of items. A binary search would probably be better in that case... but would also be more complicated to write, for little gain if you have a small sample size. Here's an example of the binary search approach in python if you want to follow that route.

(I'd recommend doing some quick performance testing of both methods on your dataset. The performance of different approaches to this sort of algorithm is often a bit unintuitive.)


Edit: I took my own advice, since I was curious, and did a few tests.

I compared four approaches:

The weighted_choice function above.

A binary-search choice function like so:

def weighted_choice_bisect(items):
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    return items[bisect.bisect(added_weights, random.random() * last_sum)][0]

A compiling version of 1:

def weighted_choice_compile(items):
    """returns a function that fetches a random item from items

    items is a list of tuples in the form (item, weight)"""
    weight_total = sum((item[1] for item in items))
    def choice(uniform = random.uniform):
        n = uniform(0, weight_total)
        for item, weight in items:
            if n < weight:
                return item
            n = n - weight
        return item
    return choice

A compiling version of 2:

def weighted_choice_bisect_compile(items):
    """Returns a function that makes a weighted random choice from items."""
    added_weights = []
    last_sum = 0

    for item, weight in items:
        last_sum += weight
        added_weights.append(last_sum)

    def choice(rnd=random.random, bis=bisect.bisect):
        return items[bis(added_weights, rnd() * last_sum)][0]
    return choice

I then built a big list of choices like so:

choices = [(random.choice("abcdefg"), random.uniform(0,50)) for i in xrange(2500)]

And an excessively simple profiling function:

def profiler(f, n, *args, **kwargs):
    start = time.time()
    for i in xrange(n):
        f(*args, **kwargs)
    return time.time() - start

The results:

(Seconds taken for 1,000 calls to the function.)

  • Simple uncompiled: 0.918624162674
  • Binary uncompiled: 1.01497793198
  • Simple compiled: 0.287325024605
  • Binary compiled: 0.00327413797379

The "compiled" results include the average time taken to compile the choice function once. (I timed 1,000 compiles, then divided that time by 1,000, and added the result to the choice function time.)

So: if you have a list of items+weights which change very rarely, the binary compiled method is by far the fastest.

等数载,海棠开 2024-07-20 04:37:21

在对原始帖子的评论中,尼古拉斯·伦纳德 (Nicholas Leonard) 建议交换和采样都需要快速。 这是针对该案例的一个想法; 我没试过。

如果采样必须很快,我们可以使用值的数组及其概率的运行总和,并对运行总和进行二分搜索(密钥是统一随机数)——O(log( n)) 操作。 但是交换需要更新交换条目后出现的所有运行总和值——O(n) 操作。 (您可以选择仅交换列表末尾附近的项目吗?我假设不会。)

因此,让我们在这两个操作中以 O(log(n)) 为目标。 为每个要从中采样的集合保留一个二叉树,而不是数组。 叶子保存样本值及其(未标准化)概率。 分支节点保存其子节点的总概率。

要进行采样,请生成一个介于 0 和根的总概率之间的均匀随机数 x,然后沿树下降。 在每个分支,如果左子节点的总概率 <= x,则选择左子节点。 否则从 x 中减去左孩子的概率并向右走。 返回您达到的叶子值。

要进行交换,请从树中移除叶子并调整通向该叶子的分支(降低它们的总概率,并删除任何单子分支节点)。 将叶子插入目标树:您可以选择将其放置的位置,因此请保持平衡。 在每个级别随机选择一个孩子可能就足够了——这就是我开始的地方。 增加每个父节点的概率,回溯到根。

现在采样和交换平均都是 O(log(n))。 (如果需要保证平衡,一个简单的方法是在分支节点中添加另一个字段,保存整个子树中叶子的数量。添加叶子时,在每个级别选择叶子较少的子节点。这会导致 变得不平衡;如果集合之间的流量相当均匀,那么这不会成为问题,但如果是,则在删除期间使用遍历中每个节点的叶计数信息选择旋转。)

树仅通过删除而 更新:根据要求,这是一个基本的实现。 根本就没有调过 用法:

>>> t1 = build_tree([('one', 20), ('two', 2), ('three', 50)])
>>> t1
Branch(Leaf(20, 'one'), Branch(Leaf(2, 'two'), Leaf(50, 'three')))
>>> t1.sample()
Leaf(50, 'three')
>>> t1.sample()
Leaf(20, 'one')
>>> t2 = build_tree([('four', 10), ('five', 30)])
>>> t1a, t2a = transfer(t1, t2)
>>> t1a
Branch(Leaf(20, 'one'), Leaf(2, 'two'))
>>> t2a
Branch(Leaf(10, 'four'), Branch(Leaf(30, 'five'), Leaf(50, 'three')))

代码:

import random

def build_tree(pairs):
    tree = Empty()
    for value, weight in pairs:
        tree = tree.add(Leaf(weight, value))
    return tree

def transfer(from_tree, to_tree):
    """Given a nonempty tree and a target, move a leaf from the former to
    the latter. Return the two updated trees."""
    leaf, from_tree1 = from_tree.extract()
    return from_tree1, to_tree.add(leaf)

class Tree:
    def add(self, leaf):
        "Return a new tree holding my leaves plus the given leaf."
        abstract
    def sample(self):
        "Pick one of my leaves at random in proportion to its weight."
        return self.sampling(random.uniform(0, self.weight))
    def extract(self):
        """Pick one of my leaves and return it along with a new tree
        holding my leaves minus that one leaf."""
        return self.extracting(random.uniform(0, self.weight))        

class Empty(Tree):
    weight = 0
    def __repr__(self):
        return 'Empty()'
    def add(self, leaf):
        return leaf
    def sampling(self, weight):
        raise Exception("You can't sample an empty tree")
    def extracting(self, weight):
        raise Exception("You can't extract from an empty tree")

class Leaf(Tree):
    def __init__(self, weight, value):
        self.weight = weight
        self.value = value
    def __repr__(self):
        return 'Leaf(%r, %r)' % (self.weight, self.value)
    def add(self, leaf):
        return Branch(self, leaf)
    def sampling(self, weight):
        return self
    def extracting(self, weight):
        return self, Empty()

def combine(left, right):
    if isinstance(left, Empty): return right
    if isinstance(right, Empty): return left
    return Branch(left, right)

class Branch(Tree):
    def __init__(self, left, right):
        self.weight = left.weight + right.weight
        self.left = left
        self.right = right
    def __repr__(self):
        return 'Branch(%r, %r)' % (self.left, self.right)
    def add(self, leaf):
        # Adding to a random branch as a clumsy way to keep an
        # approximately balanced tree.
        if random.random() < 0.5:
            return combine(self.left.add(leaf), self.right)
        return combine(self.left, self.right.add(leaf))
    def sampling(self, weight):
        if weight < self.left.weight:
            return self.left.sampling(weight)
        return self.right.sampling(weight - self.left.weight)
    def extracting(self, weight):
        if weight < self.left.weight:
            leaf, left1 = self.left.extracting(weight)
            return leaf, combine(left1, self.right)
        leaf, right1 = self.right.extracting(weight - self.left.weight)
        return leaf, combine(self.left, right1)

更新2:回答另一个问题,Jason Orendorff 指出,通过像经典堆结构一样用数组表示二叉树,可以保持二叉树的完美平衡。 (这也节省了指针上花费的空间。)请参阅我对该答案的评论,了解如何使他的代码适应这个问题。

In comments on the original post, Nicholas Leonard suggests that both the exchanging and the sampling need to be fast. Here's an idea for that case; I haven't tried it.

If only sampling had to be fast, we could use an array of the values together with the running sum of their probabilities, and do a binary search on the running sum (with key being a uniform random number) -- an O(log(n)) operation. But an exchange would require updating all of the running-sum values appearing after the entries exchanged -- an O(n) operation. (Could you choose to exchange only items near the end of their lists? I'll assume not.)

So let's aim for O(log(n)) in both operations. Instead of an array, keep a binary tree for each set to sample from. A leaf holds the sample value and its (unnormalized) probability. A branch node holds the total probability of its children.

To sample, generate a uniform random number x between 0 and the total probability of the root, and descend the tree. At each branch, choose the left child if the left child has total probability <= x. Else subtract the left child's probability from x and go right. Return the leaf value you reach.

To exchange, remove the leaf from its tree and adjust the branches that lead down to it (decreasing their total probability, and cutting out any single-child branch nodes). Insert the leaf into the destination tree: you have a choice of where to put it, so keep it balanced. Picking a random child at each level is probably good enough -- that's where I'd start. Increase each parent node's probability, back up to the root.

Now both sampling and exchange are O(log(n)) on average. (If you need guaranteed balance, a simple way is to add another field to the branch nodes holding the count of leaves in the whole subtree. When adding a leaf, at each level pick the child with fewer leaves. This leaves the possibility of a tree getting unbalanced solely by deletions; this can't be a problem if there's reasonably even traffic between the sets, but if it is, then choose rotations during deletion using the leaf-count information on each node in your traversal.)

Update: On request, here's a basic implementation. Haven't tuned it at all. Usage:

>>> t1 = build_tree([('one', 20), ('two', 2), ('three', 50)])
>>> t1
Branch(Leaf(20, 'one'), Branch(Leaf(2, 'two'), Leaf(50, 'three')))
>>> t1.sample()
Leaf(50, 'three')
>>> t1.sample()
Leaf(20, 'one')
>>> t2 = build_tree([('four', 10), ('five', 30)])
>>> t1a, t2a = transfer(t1, t2)
>>> t1a
Branch(Leaf(20, 'one'), Leaf(2, 'two'))
>>> t2a
Branch(Leaf(10, 'four'), Branch(Leaf(30, 'five'), Leaf(50, 'three')))

Code:

import random

def build_tree(pairs):
    tree = Empty()
    for value, weight in pairs:
        tree = tree.add(Leaf(weight, value))
    return tree

def transfer(from_tree, to_tree):
    """Given a nonempty tree and a target, move a leaf from the former to
    the latter. Return the two updated trees."""
    leaf, from_tree1 = from_tree.extract()
    return from_tree1, to_tree.add(leaf)

class Tree:
    def add(self, leaf):
        "Return a new tree holding my leaves plus the given leaf."
        abstract
    def sample(self):
        "Pick one of my leaves at random in proportion to its weight."
        return self.sampling(random.uniform(0, self.weight))
    def extract(self):
        """Pick one of my leaves and return it along with a new tree
        holding my leaves minus that one leaf."""
        return self.extracting(random.uniform(0, self.weight))        

class Empty(Tree):
    weight = 0
    def __repr__(self):
        return 'Empty()'
    def add(self, leaf):
        return leaf
    def sampling(self, weight):
        raise Exception("You can't sample an empty tree")
    def extracting(self, weight):
        raise Exception("You can't extract from an empty tree")

class Leaf(Tree):
    def __init__(self, weight, value):
        self.weight = weight
        self.value = value
    def __repr__(self):
        return 'Leaf(%r, %r)' % (self.weight, self.value)
    def add(self, leaf):
        return Branch(self, leaf)
    def sampling(self, weight):
        return self
    def extracting(self, weight):
        return self, Empty()

def combine(left, right):
    if isinstance(left, Empty): return right
    if isinstance(right, Empty): return left
    return Branch(left, right)

class Branch(Tree):
    def __init__(self, left, right):
        self.weight = left.weight + right.weight
        self.left = left
        self.right = right
    def __repr__(self):
        return 'Branch(%r, %r)' % (self.left, self.right)
    def add(self, leaf):
        # Adding to a random branch as a clumsy way to keep an
        # approximately balanced tree.
        if random.random() < 0.5:
            return combine(self.left.add(leaf), self.right)
        return combine(self.left, self.right.add(leaf))
    def sampling(self, weight):
        if weight < self.left.weight:
            return self.left.sampling(weight)
        return self.right.sampling(weight - self.left.weight)
    def extracting(self, weight):
        if weight < self.left.weight:
            leaf, left1 = self.left.extracting(weight)
            return leaf, combine(left1, self.right)
        leaf, right1 = self.right.extracting(weight - self.left.weight)
        return leaf, combine(self.left, right1)

Update 2: In answering another problem, Jason Orendorff points out that the binary trees can be kept perfectly balanced by representing them in an array just like the classical heap structure. (This saves the space spent on pointers, too.) See my comments to that answer for how to adapt his code to this problem.

葵雨 2024-07-20 04:37:21

我建议你移植 这个 PHP 实现加权随机到Python。 特别是,基于二分搜索的第二种算法有助于解决您的速度问题。

I suggest you port this PHP implementation of weighted random to Python. In particular, the binary-search-based second algorithm helps address your speed concerns.

栀梦 2024-07-20 04:37:21

我会使用这个 食谱 。 您需要为对象添加权重,但这只是一个简单的比率,并将它们放入元组列表中(对象,信念/(信念之和))。 使用列表理解应该很容易做到这一点。

I would use this recipe . You will need to add a weight to your objects, but that is just a simple ratio and put them in a list of tuples (object, conviction/(sum of convictions)). This should be easy to do using a list comprehension.

宁愿没拥抱 2024-07-20 04:37:21

这是一种经典的伪代码方法,其中 random.random() 为您提供从 0 到 1 的随机浮点数。

let z = sum of all the convictions
let choice = random.random() * z 
iterate through your objects:
    choice = choice - the current object's conviction
    if choice <= 0, return this object
return the last object

举个例子:假设您有两个对象,一个权重为 2,另一个权重为 4。您生成0 到 6 之间的数字。如果 choice 介于 0 和 2 之间(发生概率为 2/6 = 1/3),则它将减去 2,并选择第一个对象。 如果选择在 2 和 6 之间,这将以 4/6 = 2/3 的概率发生,那么第一次减法仍然有选择 > 1。 0,第二次减法将使第二个对象被选中。

Here is a classic way to do it, in pseudocode, where random.random() gives you a random float from 0 to 1.

let z = sum of all the convictions
let choice = random.random() * z 
iterate through your objects:
    choice = choice - the current object's conviction
    if choice <= 0, return this object
return the last object

For an example: imagine you have two objects, one with weight 2, another with weight 4. You generate a number from 0 to 6. If choice is between 0 and 2, which will happen with 2/6 = 1/3 probability, then it will get subtracted by 2 and the first object is chosen. If choice is between 2 and 6, which will happen with 4/6 = 2/3 probability, then the first subtraction will still have choice being > 0, and the second subtraction will make the 2nd object get chosen.

花想c 2024-07-20 04:37:21

您想给每个对象一个重量。 重量越大,发生这种情况的可能性就越大。 更准确地说,probx =weight/sum_all_weights。

然后生成一个0到sum_all_weights范围内的随机数
并将其映射到每个对象。

此代码允许您生成随机索引,并在创建对象时进行映射以提高速度。 如果所有对象集都具有相同的分布,那么您只需使用一个 RandomIndex 对象即可。

import random

class RandomIndex:
    def __init__(self, wlist):
        self._wi=[]
        self._rsize=sum(wlist)-1
        self._m={}
        i=0
        s=wlist[i]
        for n in range(self._rsize+1):
            if n == s:
                i+=1
                s+=wlist[i]
            self._m[n]=i    

    def i(self):
        rn=random.randint(0,self._rsize)
        return self._m[rn]


sx=[1,2,3,4]


wx=[1,10,100,1000] #weight list
ri=RandomIndex(wx)

cnt=[0,0,0,0]

for i in range(1000):
    cnt[ri.i()] +=1  #keep track of number of times each index was generated

print(cnt)  

You want to give each object a weight. The bigger the weight the more likely it will happen. More precisely probx =weight/sum_all_weights.

Then generate a random number in the range 0 to sum_all_weights
and map it to each object.

This code allows you to generate a random index and it is mapped when the object is created for speed. If all of your sets of objects have the same distribution then you can get by with only one RandomIndex object.

import random

class RandomIndex:
    def __init__(self, wlist):
        self._wi=[]
        self._rsize=sum(wlist)-1
        self._m={}
        i=0
        s=wlist[i]
        for n in range(self._rsize+1):
            if n == s:
                i+=1
                s+=wlist[i]
            self._m[n]=i    

    def i(self):
        rn=random.randint(0,self._rsize)
        return self._m[rn]


sx=[1,2,3,4]


wx=[1,10,100,1000] #weight list
ri=RandomIndex(wx)

cnt=[0,0,0,0]

for i in range(1000):
    cnt[ri.i()] +=1  #keep track of number of times each index was generated

print(cnt)  
浅听莫相离 2024-07-20 04:37:21

大约 3 年后...

如果您使用 numpy,也许最简单的选择是使用 np.random.choice,它采用可能值的列表和一个可选值与每个值相关的概率序列:

import numpy as np

values = ('A', 'B', 'C', 'D')
weights = (0.5, 0.1, 0.2, 0.2)

print ''.join(np.random.choice(values, size=60, replace=True, p=weights))
# ACCADAACCDACDBACCADCAAAAAAADACCDCAADDDADAAACCAAACBAAADCADABA

About 3 years later...

If you use numpy, perhaps the simplest option is to use np.random.choice, which takes a list of possible values, and an optional sequence of probabilities associated with each value:

import numpy as np

values = ('A', 'B', 'C', 'D')
weights = (0.5, 0.1, 0.2, 0.2)

print ''.join(np.random.choice(values, size=60, replace=True, p=weights))
# ACCADAACCDACDBACCADCAAAAAAADACCDCAADDDADAAACCAAACBAAADCADABA
禾厶谷欠 2024-07-20 04:37:21

最简单的方法是使用 random.choice (它使用均匀分布)并改变源集合中对象的出现频率。

>>> random.choice([1, 2, 3, 4])
4

... vs:

>>> random.choice([1, 1, 1, 1, 2, 2, 2, 3, 3, 4])
2

因此,您的对象可能具有基本发生率 (n),并且 1 到 n 之间的对象将作为定罪率的函数添加到源集合中。 这个方法其实很简单; 然而,如果不同对象的数量很大或者信念率需要非常细粒度,它可能会产生巨大的开销。

或者,如果使用均匀分布生成多个随机数并对它们求和,则接近平均值的数字更有可能出现在极端值附近的数字(想象一下掷两个骰子,得到 7 与 12 或 2 的概率)。 然后,您可以按定罪率对对象进行排序,并使用多个骰子生成一个数字,用于计算和索引对象。 使用接近平均值的数字来索引低信念对象,使用接近极值的数字来索引高信念项目。 您可以通过更改“面数”和“骰子”数量来改变选择给定对象的精确概率(将对象放入桶中并使用面数较少的骰子可能更简单,而不是使用面数较少的骰子)尝试将每个对象与特定结果关联起来):

>>> die = lambda sides : random.randint(1, sides)
>>> die(6)
3
>>> die(6) + die(6) + die(6)
10

The simplest thing to do is to use random.choice (which uses a uniform distribution) and vary the frequency of occurrence on the object in the source collection.

>>> random.choice([1, 2, 3, 4])
4

... vs:

>>> random.choice([1, 1, 1, 1, 2, 2, 2, 3, 3, 4])
2

So your objects could have a base occurrence rate (n) and between 1 and n objects are added to the source collection as a function of the conviction rate. This method is really simple; however, it can have significant overhead if the number of distinct objects is large or the conviction rate needs to be very fine grained.

Alternatively, if you generate more that one random number using a uniform distribution and sum them, numbers occurring near the mean are more probable that those occurring near the extremes (think of rolling two dice and the probability of getting 7 versus 12 or 2). You can then order the objects by conviction rate and generate a number using multiple die rolls which you use to calculate and index into the objects. Use numbers near the mean to index low conviction objects and numbers near the extremes to index high conviction items. You can vary the precise probability that a given object will be selected by changing the "number of sides" and number of your "dice" (it may be simpler to put the objects into buckets and use dice with a small number of sides rather than trying to associate each object with a specific result):

>>> die = lambda sides : random.randint(1, sides)
>>> die(6)
3
>>> die(6) + die(6) + die(6)
10
素染倾城色 2024-07-20 04:37:21

执行此操作的一种非常简单的方法是为每个值设置权重,并且不需要太多内存。

您可能可以使用哈希/字典来执行此操作。

您需要做的是将随机数 x 与您想要选择的整个对象集相乘并求和,然后将结果除以集合中的对象数量。

伪代码:

objectSet = [(object1, weight1), ..., (objectN, weightN)]
sum = 0
rand = random()
for obj, weight in objectSet
    sum = sum+weight*rand
choice = objectSet[floor(sum/objectSet.size())]

编辑:我只是想到我的代码对于非常大的集合(它是 O(n))会有多慢。 下面的伪代码是 O(log(n)),并且基本上使用二分搜索。

objectSet = [(object1, weight1), ..., (objectN, weightN)]
sort objectSet from less to greater according to weights
choice = random() * N # where N is the number of objects in objectSet
do a binary search until you have just one answer

网上到处都有Python的二分查找实现,这里就不再赘述了。

A very easy and simple way of doing this is to set weights for each of the values, and it wouldn't require much memory.

You could probably use a hash/dictionary to do this.

What you'll want to do is to have the random number, x, multiplied and summed over the entire set of things you want selected, and divide that result over the number of objects in your set.

Pseudo-code:

objectSet = [(object1, weight1), ..., (objectN, weightN)]
sum = 0
rand = random()
for obj, weight in objectSet
    sum = sum+weight*rand
choice = objectSet[floor(sum/objectSet.size())]

EDIT: I just thought of how slow my code would be with very large sets (it's O(n)). The following pseudo-code is O(log(n)), and is basically using a binary search.

objectSet = [(object1, weight1), ..., (objectN, weightN)]
sort objectSet from less to greater according to weights
choice = random() * N # where N is the number of objects in objectSet
do a binary search until you have just one answer

There are implementations of binary search in Python all over the 'net, so no need repeating here.

〗斷ホ乔殘χμё〖 2024-07-20 04:37:21

这是针对特殊概率分布的更好答案,Rex Logan 的答案似乎面向。 分布是这样的:每个对象都有一个0到100之间的整数权重,它的概率与其权重成正比。 由于这是目前公认的答案,我想这值得考虑。

因此保留 101 个 bin 的数组。 每个箱子都包含所有具有特定重量的物体的列表。 每个垃圾箱还知道其所有物体的总重量

抽样:按照其总重量的比例随机选取一个箱。 (为此使用标准方法之一 - 线性或二分搜索。)然后从垃圾箱中均匀随机挑选一个对象。

要传输对象:将其从其垃圾箱中删除,将其放入目标中的垃圾箱中,然后更新两个垃圾箱的权重。 (如果您使用二分搜索进行采样,则还必须更新所使用的运行总和。这仍然相当快,因为​​箱数不多。)

Here's a better answer for a special probability distribution, the one Rex Logan's answer seems to be geared at. The distribution is like this: each object has an integer weight between 0 and 100, and its probability is in proportion to its weight. Since that's the currently accepted answer, I guess this is worth thinking about.

So keep an array of 101 bins. Each bin holds a list of all of the objects with its particular weight. Each bin also knows the total weight of all its objects.

To sample: pick a bin at random in proportion to its total weight. (Use one of the standard recipes for this -- linear or binary search.) Then pick an object from the bin uniformly at random.

To transfer an object: remove it from its bin, put it in its bin in the target, and update both bins' weights. (If you're using binary search for sampling, you must also update the running sums that uses. This is still reasonably fast since there aren't many bins.)

皇甫轩 2024-07-20 04:37:21

(一年后)
Walker 对不同概率的随机对象的别名方法非常快速且非常简单

蝶…霜飞 2024-07-20 04:37:21

我需要更快的函数,用于非非常大的数字。 这是 Visual C++ 中的:

#undef _DEBUG // disable linking with python25_d.dll
#include <Python.h>
#include <malloc.h>
#include <stdlib.h>

static PyObject* dieroll(PyObject *, PyObject *args)
{
    PyObject *list;
    if (!PyArg_ParseTuple(args, "O:decompress", &list))
        return NULL;

    if (!PyList_Check(list)) 
        return PyErr_Format(PyExc_TypeError, "list of numbers expected ('%s' given)", list->ob_type->tp_name), NULL;

    int size = PyList_Size(list);

    if (size < 1)
        return PyErr_Format(PyExc_TypeError, "got empty list"), NULL;

    long *array = (long*)alloca(size*sizeof(long));

    long sum = 0;
    for (int i = 0; i < size; i++) {
        PyObject *o = PyList_GetItem(list, i);

        if (!PyInt_Check(o))
            return PyErr_Format(PyExc_TypeError, "list of ints expected ('%s' found)", o->ob_type->tp_name), NULL;
        long n = PyInt_AsLong(o);
        if (n == -1 && PyErr_Occurred())
            return NULL;
        if (n < 0)
            return PyErr_Format(PyExc_TypeError, "list of positive ints expected (negative found)"), NULL;

        sum += n; //NOTE: integer overflow
        array[i] = sum;
    }

    if (sum <= 0)
        return PyErr_Format(PyExc_TypeError, "sum of numbers is not positive"), NULL;

    int r = rand() * (sum-1) / RAND_MAX; //NOTE: rand() may be too small (0x7fff).    rand() * sum may result in integer overlow.

    assert(array[size-1] == sum);
    assert(r < sum && r < array[size-1]);
    for (int i = 0; i < size; ++i)
    {
        if (r < array[i])
            return PyInt_FromLong(i);
    }
    return PyErr_Format(PyExc_TypeError, "internal error."), NULL;
}

static PyMethodDef module_methods[] = 
{
    {"dieroll", (PyCFunction)dieroll, METH_VARARGS, "random index, beased on weights" },
    {NULL}  /* Sentinel */
};

PyMODINIT_FUNC initdieroll(void) 
{
    PyObject *module = Py_InitModule3("dieroll", module_methods, "dieroll");
    if (module == NULL)
        return;
}

I was needed in faster functions, for non very large numbers. So here it is, in Visual C++:

#undef _DEBUG // disable linking with python25_d.dll
#include <Python.h>
#include <malloc.h>
#include <stdlib.h>

static PyObject* dieroll(PyObject *, PyObject *args)
{
    PyObject *list;
    if (!PyArg_ParseTuple(args, "O:decompress", &list))
        return NULL;

    if (!PyList_Check(list)) 
        return PyErr_Format(PyExc_TypeError, "list of numbers expected ('%s' given)", list->ob_type->tp_name), NULL;

    int size = PyList_Size(list);

    if (size < 1)
        return PyErr_Format(PyExc_TypeError, "got empty list"), NULL;

    long *array = (long*)alloca(size*sizeof(long));

    long sum = 0;
    for (int i = 0; i < size; i++) {
        PyObject *o = PyList_GetItem(list, i);

        if (!PyInt_Check(o))
            return PyErr_Format(PyExc_TypeError, "list of ints expected ('%s' found)", o->ob_type->tp_name), NULL;
        long n = PyInt_AsLong(o);
        if (n == -1 && PyErr_Occurred())
            return NULL;
        if (n < 0)
            return PyErr_Format(PyExc_TypeError, "list of positive ints expected (negative found)"), NULL;

        sum += n; //NOTE: integer overflow
        array[i] = sum;
    }

    if (sum <= 0)
        return PyErr_Format(PyExc_TypeError, "sum of numbers is not positive"), NULL;

    int r = rand() * (sum-1) / RAND_MAX; //NOTE: rand() may be too small (0x7fff).    rand() * sum may result in integer overlow.

    assert(array[size-1] == sum);
    assert(r < sum && r < array[size-1]);
    for (int i = 0; i < size; ++i)
    {
        if (r < array[i])
            return PyInt_FromLong(i);
    }
    return PyErr_Format(PyExc_TypeError, "internal error."), NULL;
}

static PyMethodDef module_methods[] = 
{
    {"dieroll", (PyCFunction)dieroll, METH_VARARGS, "random index, beased on weights" },
    {NULL}  /* Sentinel */
};

PyMODINIT_FUNC initdieroll(void) 
{
    PyObject *module = Py_InitModule3("dieroll", module_methods, "dieroll");
    if (module == NULL)
        return;
}
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文