Python 位掩码(可变长度)

发布于 2024-10-16 06:51:39 字数 896 浏览 2 评论 0原文

为了解决一个研究问题,我们必须在 python 中组织位掩码搜索。 作为输入,我们有一个原始数据(我们将其表示为一系列位)。大小约为 1.5Gb。 作为输出,我们必须获取特定位掩码出现的次数。 让我举一个例子来描述这种情况,

input:    sequence of bits, a bitmask to search(mask length: 12bits)

第一个想法(效率不高)是像这样使用 XOR:

1step: from input we take 12 first bits(position 0 to 11) and make XOR with mask 
2step: from input we take bits from 1 to 12 position and XOR with mask ...

让我们进行第 2 个步骤:

input sequence 100100011110101010110110011010100101010110101010
mask to search: 100100011110
step 1: take first 12 bits from input: 100100011110 and XOR it with mask.
step 2: teke bits from 1 to 12position: 001000111101 and XOR it with mask.
...

问题是:如何组织从输入中获取位? 我们能够获取前 12 位,但是我们如何获取我们需要进行下一次迭代的 1 到 12 位的位呢?

之前我们使用Python BitString包,但是我们搜索所有掩码所花费的时间太长了。 还有一个。掩码的大小可以从 12 位到 256 位。 有什么建议吗?任务必须在Python中实现

in order to solve one research problem we have to organise the bitmask search in python.
As an input we have a raw data (we present it as a sequence of bits). Size is smth about 1,5Gb.
as an output we have to get the number of occurrence of a specific bitmasks.
Let me give an example to discribe the situation

input:    sequence of bits, a bitmask to search(mask length: 12bits)

the first idea (not efficient one) is to use XOR like this:

1step: from input we take 12 first bits(position 0 to 11) and make XOR with mask 
2step: from input we take bits from 1 to 12 position and XOR with mask ...

let us proceed 2 first steps:

input sequence 100100011110101010110110011010100101010110101010
mask to search: 100100011110
step 1: take first 12 bits from input: 100100011110 and XOR it with mask.
step 2: teke bits from 1 to 12position: 001000111101 and XOR it with mask.
...

the problem is: how to organise taking bits from an input ?
we are able to take first 12 bits, but how can we take bits from 1 to 12 position those we need to proceed next iteration?

Before we use a python BitString package, but the time we spend to search all mask are to high.
And one more. The size of a mask can be fron 12 bits up to 256.
have any suggestions? task have to be implemented in python

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

冬天旳寂寞 2024-10-23 06:51:39

您的算法是在数据中搜索“字符串”的简单方法,但幸运的是有更好的算法。
一个示例是 KMP 算法,但还有其他算法可能更适合您的用例。

使用更好的算法,您可以将复杂度从 O(n*m) 降低到 O(n+m)

Your algorithm is the naive way to search for "strings" in data, but luckily there are much better algorithms.
One example is the KMP algorithm, but there are others that might fit your use case better.

With a better algorithm you can go from complexity of O(n*m) to O(n+m).

何必那么矫情 2024-10-23 06:51:39

如果您的掩码是 8 位的倍数,则您的搜索将变成相对简单的字节比较,并且任何

sequence = <list of 8-bit integers>
mask = [0b10010001, 0b01101101]
matches = my_substring_search(sequence, mask)

="nofollow">子字符串搜索算法就足够了(我 如果掩码大于 8 位但不是 8 的倍数,我建议将掩码截断为 8 的倍数并使用与上面相同的子字符串搜索。然后,对于找到的任何匹配项,您可以测试其余的匹配项。

sequence = <list of 8-bit integers>
mask_a = [0b10010001]
mask_b = 0b01100000
mask_b_pattern = 0b11110000   # relevant bits of mask_b
matches = my_substring_search(sequence, mask_a)

for match in matches:
    if (sequence[match+len(mask_a)] & mask_b_pattern) == mask_b:
        valid_match = True  # or something more useful...

如果 sequence 是 4096 字节的列表,您可能需要考虑各部分之间的重叠。这可以通过将 sequence 设为 4096+ceil(mask_bits/8.0) 字节的列表来轻松完成,但每次读取下一个块时仍仅前进 4096。


作为生成和使用这些掩码的演示:

class Mask(object):
    def __init__(self, source, source_mask):
        self._masks = list(self._generate_masks(source, source_mask))

    def match(self, buffer, i, j):
        return any(m.match(buffer, i, j) for m in self._masks)

    class MaskBits(object):
        def __init__(self, pre, pre_mask, match_bytes, post, post_mask):
            self.match_bytes = match_bytes
            self.pre, self.pre_mask = pre, pre_mask
            self.post, self.post_mask = post, post_mask

        def __repr__(self):
            return '(%02x %02x) (%s) (%02x %02x)' % (self.pre, self.pre_mask,
                ', '.join('%02x' % m for m in self.match_bytes),
                self.post, self.post_mask)

        def match(self, buffer, i, j):
            return (buffer[i:j] == self.match_bytes and
                    buffer[i-1] & self.pre_mask == self.pre and
                    buffer[j] & self.post_mask == self.post)

    def _generate_masks(self, src, src_mask):
        pre_mask = 0
        pre = 0
        post_mask = 0
        post = 0
        while pre_mask != 0xFF:
            src_bytes = []
            for i in (24, 16, 8, 0):
                if (src_mask >> i) & 0xFF == 0xFF:
                    src_bytes.append((src >> i) & 0xFF)
                else:
                    post_mask = (src_mask >> i) & 0xFF
                    post = (src >> i) & 0xFF
                    break
            yield self.MaskBits(pre, pre_mask, src_bytes, post, post_mask)
            pre += pre
            pre_mask += pre_mask
            if src & 0x80000000: pre |= 0x00000001
            pre_mask |= 0x00000001
            src = (src & 0x7FFFFFFF) * 2
            src_mask = (src_mask & 0x7FFFFFFF) * 2

此代码不是完整的搜索算法,它构成验证匹配的一部分。 Mask 对象由源值和源掩码构造而成,两者均左对齐且(在此实现中)长度为 32 位:

src = 0b11101011011011010101001010100000
src_mask = 0b11111111111111111111111111100000

缓冲区是字节值列表:

buffer_1 = [0x7e, 0xb6, 0xd5, 0x2b, 0x88]

Mask 对象生成移位掩码的内部列表:

>> m = Mask(src, src_mask)
>> m._masks
[(00 00) (eb, 6d, 52) (a0 e0),
 (01 01) (d6, da, a5) (40 c0),
 (03 03) (ad, b5, 4a) (80 80),
 (07 07) (5b, 6a, 95) (00 00),
 (0e 0f) (b6, d5) (2a fe),
 (1d 1f) (6d, aa) (54 fc),
 (3a 3f) (db, 54) (a8 f8),
 (75 7f) (b6, a9) (50 f0)]

中间element 是精确匹配子字符串(没有一种巧妙的方法可以按原样从该对象中获取它,但它是 m._masks[i].match_bytes)。使用有效的算法找到该子序列后,您可以使用 m.match(buffer, i, j) 验证周围的位,其中 i 是第一个匹配字节,j 是最后一个匹配字节之后的字节索引(例如 buffer[i:j] == match_bytes)。

在上面的buffer中,可以从第5位开始找到位序列,这意味着可以在buffer[1:3处找到_masks[4].match_bytes ]。结果:(

>> m.match(buffer, 1, 3)
True

随意以任何可能的方式使用、改编、修改、出售或折磨此代码。我非常喜欢将它组合在一起 - 一个有趣的问题 - 尽管我不会对任何错误负责,所以请确保你彻底测试一下!)

Where your mask is a multiple of 8 bits, your search becomes a relatively trivial byte comparison and any substring search algorithm will suffice (I would not recommend converting to a string and using the built in search, since you will likely suffer from failed character validation issues.)

sequence = <list of 8-bit integers>
mask = [0b10010001, 0b01101101]
matches = my_substring_search(sequence, mask)

For a mask of greater than 8 bits but not a multiple of eight, I would suggest truncating the mask to a multiple of 8 and using the same substring search as above. Then for any matches found, you can test the remainder.

sequence = <list of 8-bit integers>
mask_a = [0b10010001]
mask_b = 0b01100000
mask_b_pattern = 0b11110000   # relevant bits of mask_b
matches = my_substring_search(sequence, mask_a)

for match in matches:
    if (sequence[match+len(mask_a)] & mask_b_pattern) == mask_b:
        valid_match = True  # or something more useful...

If sequence is a list of 4096 bytes, you may need to account for the overlap between sections. This can be easily done by making sequence a list of 4096+ceil(mask_bits/8.0) bytes, but still advancing by only 4096 each time you read the next block.


As a demonstration of generating and using these masks:

class Mask(object):
    def __init__(self, source, source_mask):
        self._masks = list(self._generate_masks(source, source_mask))

    def match(self, buffer, i, j):
        return any(m.match(buffer, i, j) for m in self._masks)

    class MaskBits(object):
        def __init__(self, pre, pre_mask, match_bytes, post, post_mask):
            self.match_bytes = match_bytes
            self.pre, self.pre_mask = pre, pre_mask
            self.post, self.post_mask = post, post_mask

        def __repr__(self):
            return '(%02x %02x) (%s) (%02x %02x)' % (self.pre, self.pre_mask,
                ', '.join('%02x' % m for m in self.match_bytes),
                self.post, self.post_mask)

        def match(self, buffer, i, j):
            return (buffer[i:j] == self.match_bytes and
                    buffer[i-1] & self.pre_mask == self.pre and
                    buffer[j] & self.post_mask == self.post)

    def _generate_masks(self, src, src_mask):
        pre_mask = 0
        pre = 0
        post_mask = 0
        post = 0
        while pre_mask != 0xFF:
            src_bytes = []
            for i in (24, 16, 8, 0):
                if (src_mask >> i) & 0xFF == 0xFF:
                    src_bytes.append((src >> i) & 0xFF)
                else:
                    post_mask = (src_mask >> i) & 0xFF
                    post = (src >> i) & 0xFF
                    break
            yield self.MaskBits(pre, pre_mask, src_bytes, post, post_mask)
            pre += pre
            pre_mask += pre_mask
            if src & 0x80000000: pre |= 0x00000001
            pre_mask |= 0x00000001
            src = (src & 0x7FFFFFFF) * 2
            src_mask = (src_mask & 0x7FFFFFFF) * 2

This code is not a complete search algorithm, it forms part of validating matches. The Mask object is constructed with a source value and a source mask, both left aligned and (in this implementation) 32-bits long:

src = 0b11101011011011010101001010100000
src_mask = 0b11111111111111111111111111100000

The buffer is a list of byte values:

buffer_1 = [0x7e, 0xb6, 0xd5, 0x2b, 0x88]

A Mask object generates an internal list of shifted masks:

>> m = Mask(src, src_mask)
>> m._masks
[(00 00) (eb, 6d, 52) (a0 e0),
 (01 01) (d6, da, a5) (40 c0),
 (03 03) (ad, b5, 4a) (80 80),
 (07 07) (5b, 6a, 95) (00 00),
 (0e 0f) (b6, d5) (2a fe),
 (1d 1f) (6d, aa) (54 fc),
 (3a 3f) (db, 54) (a8 f8),
 (75 7f) (b6, a9) (50 f0)]

The middle element is the exact match substring (there is no neat way to get this out of this object as is, but it's m._masks[i].match_bytes). Once you have used an efficient algorithm to find this subsequence, you can verify the surrounding bits using m.match(buffer, i, j), where i is the index of first matching byte and j is the index of the byte after the last matching byte (such that buffer[i:j] == match_bytes).

In buffer above, the bit sequence can be found starting at bit 5, which means that _masks[4].match_bytes can be found at buffer[1:3]. As a result:

>> m.match(buffer, 1, 3)
True

(Feel free to use, adapt, modify, sell or torture this code in any way possible. I quite enjoyed putting it together - an interesting problem - though I won't be held liable for any bugs, so make sure you test it thoroughly!)

a√萤火虫的光℡ 2024-10-23 06:51:39

在字节数据中搜索位模式比典型搜索更具挑战性。通常的算法并不总是能很好地工作,因为从字节数据中提取每一位都是有成本的,而且只有两个字符的“字母表”,所以碰巧 50% 的比较会匹配(这使得许多算法更效率较低)。

您提到尝试使用 bitstring 模块(我编写的),但它太慢了。这对我来说并不奇怪,所以如果有人对如何做到这一点有任何好主意,我会关注!但 bitstring 的方式建议您可能会加快速度:

为了进行匹配,bitstring 将字节数据块转换为“0”和“1”的普通字符串,然后使用 Python 的 find 方法快速搜索一下。很多时间都花在将数据转换为字符串上,但是当您多次搜索相同的数据时,可以节省大量时间。

masks = ['0000101010100101', '010100011110110101101', '01010101101']
byte_data_chunk = bytearray('blahblahblah')
# convert to a string with one character per bit
# uses lookup table not given here!
s = ''.join(BYTE_TO_BITS[x] for x in byte_data_chunk)
for mask in masks:
    p = s.find(mask)
    # etc.

重点是,一旦转换为普通字符串,您就可以使用内置的 find(经过非常优化)来搜索每个掩码。当您使用位串时,它必须对每个掩码进行转换,这会降低性能。

Searching for bit patterns within byte data is a little more challenging than typical searches. The usual algorithms don't always work well as there is a cost for extracting each bit from the byte data and there is only an 'alphabet' of two characters, so just by chance 50% of comparisons will match (this makes many algorithms much less efficient).

You mentioned trying the bitstring module (which I wrote) but that it was too slow. That's not too surprising to me so if anyone has any great ideas on how to do this I'm paying attention! But the way bitstring does it suggests a possible speed up for you:

To do the match bitstring converts chunks of the byte data into ordinary strings of '0's and '1', and then uses Python's find method to do a quick search. Lots of the time is spent converting the data to a string, but as you are searching in the same data multiple times there's a big saving to be had.

masks = ['0000101010100101', '010100011110110101101', '01010101101']
byte_data_chunk = bytearray('blahblahblah')
# convert to a string with one character per bit
# uses lookup table not given here!
s = ''.join(BYTE_TO_BITS[x] for x in byte_data_chunk)
for mask in masks:
    p = s.find(mask)
    # etc.

The point is that once you convert to an ordinary string you can use the built-in find, which is very well optimised, to search for each of your masks. When you used bitstring it had to do the conversion for every mask which would have killed performance.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文