循环矢量化 - 用掩模的7字节记录的计数匹配

发布于 2025-01-19 13:50:52 字数 8686 浏览 1 评论 0原文

我有一个相当简单的循环:

auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
    equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
    rowsFound += equals;
    indexRecord += byteSize; // byteSize is 7
}

其中 BitString::equals 是:

static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
    return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}

此代码用于模拟数据库中的位图索引查询。 我的问题是,是否有一种方法可以矢量化循环,遍历所有记录。 当尝试使用 GCC 和 -fopt-info-vec-missed -O3 进行编译时,我得到:missed:无法矢量化循环

我对这种优化很陌生,想了解更多,只是感觉我错过了一些东西。

编辑 首先谢谢大家的解答。我应该包括一个 Reprex。 现在就在这里,拥有所需的所有功能,我已经尽可能接近了。所有这些都是在 x86-64 平台上完成的,我同时拥有 GCC 和 Clang。

#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>

constexpr short BYTE_SIZE = 8;

class BitString {
public:
    static int getByteSizeFromBits(int bitSize) {
        return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
    }

    static void setBitString(char *rec, int bitOffset) {
        rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
    }

    static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
        return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
    }
};

// Class representing a table schema
class TableSchema {
public:
    // number of attributes of a table
    unsigned int attrs_count = -1;
    // the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
    unsigned int *attr_sizes = nullptr;
    // max value (domain) of an attribute, -1 for unlimited, ()
    int *attr_max_values = nullptr;
    // the offset of each attribute, to simplify some pointer arithmetic for further use
    unsigned int *attribute_offsets = nullptr;
    // sum of attr_sizes if the record size;
    unsigned int record_size = -1;

    void calculate_offsets() {
        if (attrs_count <= 0 || attribute_offsets != nullptr) {
            return;
        }

        attribute_offsets = new unsigned int[attrs_count];
        int offset = 0;
        for (int i = 0; i < attrs_count; ++i) {
            attribute_offsets[i] = offset;
            offset += attr_sizes[i];
        }
        record_size = offset;
    }

    TableSchema() = default;

    ~TableSchema() {
        if (attribute_offsets != nullptr) {
            delete[] attribute_offsets;
            attribute_offsets = nullptr;
        }
        attrs_count = -1;
    }
};


class BitmapIndex {
private:
    char *mData = nullptr;
    short bitSize = 0;
    int byteSize = 0;
    int attrsCount = 0;
    int *attrsMaxValue = nullptr;
    int *bitIndexAttributeOffset = nullptr;
    unsigned int recordCount = 0;
    char *SelectMask;

    unsigned int capacity = 0;

    inline char *getRowPointer(unsigned int rowId) const {
        return mData + rowId * byteSize;
    }

    inline bool shouldColBeIndexed(int max_col_value) const {
        return max_col_value > 0;
    }

public:
    BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
        auto maxValuesSum = 0;
        attrsMaxValue = new int[attrs_count];
        attrsCount = attrs_count;
        bitIndexAttributeOffset = new int[attrs_count];
        auto bitOffset = 0;
        // attribute's max value is the same as number of bits used to encode the current value
        // e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
        for (int i = 0; i < attrs_count; ++i) {
            attrsMaxValue[i] = attrs_max_value[i];
            bitIndexAttributeOffset[i] = bitOffset;
            // col is indexed only if it's max value is > 0, -1 means
            if (!shouldColBeIndexed(attrs_max_value[i]))
                continue;
            maxValuesSum += attrs_max_value[i];
            bitOffset += attrs_max_value[i];
        }
        bitSize = (short) maxValuesSum;
        byteSize = BitString::getByteSizeFromBits(bitSize);
        mData = new char[byteSize * capacity];
        memset(mData, 0, byteSize * capacity);
        SelectMask = new char[byteSize];
        this->capacity = capacity;
    }

    ~BitmapIndex() {
        if (mData != nullptr) {
            delete[] mData;
            mData = nullptr;
            delete[] attrsMaxValue;
            attrsMaxValue = nullptr;

            delete[] SelectMask;
            SelectMask = nullptr;
        }
    }

    unsigned long getTotalByteSize() const {
        return byteSize * capacity;
    }

    // add record to index
    void addRecord(const char * record, const unsigned int * attribute_sizes) {
        auto indexRecord = getRowPointer(recordCount);
        unsigned int offset = 0;
        for (int j = 0; j < attrsCount; ++j) {
            if (attrsMaxValue[j] != -1) {
                // byte col value
                char colValue = *(record + offset);
                if (colValue > attrsMaxValue[j]) {
                    throw std::runtime_error("Col value is bigger than max allowed value!");
                }
//            printf("%d ", colValue);
                BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
            }
            offset += attribute_sizes[j];
        }
        recordCount += 1;
    }

    // SELECT COUNT(*)
    int Select(const char *query) const {
        uint64_t rowsFound = 0;
        memset(SelectMask, 0, byteSize);
        for (int col = 0; col < attrsCount; ++col) {
            if (!shouldColBeIndexed(attrsMaxValue[col])) {
                continue;
            }
            auto col_value = query[col];
            if (col_value < 0) {
                for (int i = 0; i < attrsMaxValue[col]; ++i) {
                    BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
                }
            } else {
                BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
            }
        }

        uint64_t maxBytesValue = 0;
        uint64_t byteVals = 0xff;
        for (int i = 0; i < byteSize; ++i) {
            maxBytesValue |= byteVals << (i * 8);
        }

        auto indexRecord = getRowPointer(0);
        for (int i = 0; i < recordCount; ++i) {
            rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
            indexRecord += byteSize;
        }
        return rowsFound;
    }
};


void generateRecord(
        char *record,
        const unsigned int attr_sizes[],
        const int attr_max_value[],
        int attr_count
    ) {
    auto offset = 0;
    for (int c = 0; c < attr_count; ++c) {
        if (attr_max_value[c] == -1) {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % 256;
            }
        } else {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % attr_max_value[c];
            }
        }
        offset += attr_sizes[c];
    }
}

int main() {
    TableSchema schema;
    const int attribute_count = 13;
    const int record_count = 1000000;
    // for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
    unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
    int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
    schema.attrs_count = attribute_count;
    schema.attr_sizes = attr_sizes;
    schema.attr_max_values = attr_max_values;
    schema.calculate_offsets();

    srand((unsigned ) time(nullptr));

    BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);

    char *record = new char[schema.record_size];
    for (int i = 0; i < record_count; ++i) {
        // generate some random records and add them to the index
        generateRecord(record, attr_sizes, attr_max_values, attribute_count);
        bitmapIndex.addRecord(record, attr_sizes);
    }

    char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
    // simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
    auto found = bitmapIndex.Select(query);

    printf("Query found: %d records\n", found);

    delete[] record;
    return 0;
}

I have a fairly simple loop:

auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
    equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
    rowsFound += equals;
    indexRecord += byteSize; // byteSize is 7
}

Where BitString::equals is:

static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
    return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}

This code is used to simulate a Bitmap Index querying in databases.
My question is, if there's a way to vectorize the loop, going through all the records.
When trying to compile with GCC and -fopt-info-vec-missed -O3 I am getting: missed: couldn't vectorize loop.

I am new to this kind of optimizations and would like to learn more, it just feels like I am missing something.

EDIT
First of all, thank you all for answers. I should've included a Reprex.
Here it is now, with all functionality needed, as close as possible I could've done. All of this is done on x86-64 platform and I have both GCC and Clang available.

#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>

constexpr short BYTE_SIZE = 8;

class BitString {
public:
    static int getByteSizeFromBits(int bitSize) {
        return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
    }

    static void setBitString(char *rec, int bitOffset) {
        rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
    }

    static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
        return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
    }
};

// Class representing a table schema
class TableSchema {
public:
    // number of attributes of a table
    unsigned int attrs_count = -1;
    // the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
    unsigned int *attr_sizes = nullptr;
    // max value (domain) of an attribute, -1 for unlimited, ()
    int *attr_max_values = nullptr;
    // the offset of each attribute, to simplify some pointer arithmetic for further use
    unsigned int *attribute_offsets = nullptr;
    // sum of attr_sizes if the record size;
    unsigned int record_size = -1;

    void calculate_offsets() {
        if (attrs_count <= 0 || attribute_offsets != nullptr) {
            return;
        }

        attribute_offsets = new unsigned int[attrs_count];
        int offset = 0;
        for (int i = 0; i < attrs_count; ++i) {
            attribute_offsets[i] = offset;
            offset += attr_sizes[i];
        }
        record_size = offset;
    }

    TableSchema() = default;

    ~TableSchema() {
        if (attribute_offsets != nullptr) {
            delete[] attribute_offsets;
            attribute_offsets = nullptr;
        }
        attrs_count = -1;
    }
};


class BitmapIndex {
private:
    char *mData = nullptr;
    short bitSize = 0;
    int byteSize = 0;
    int attrsCount = 0;
    int *attrsMaxValue = nullptr;
    int *bitIndexAttributeOffset = nullptr;
    unsigned int recordCount = 0;
    char *SelectMask;

    unsigned int capacity = 0;

    inline char *getRowPointer(unsigned int rowId) const {
        return mData + rowId * byteSize;
    }

    inline bool shouldColBeIndexed(int max_col_value) const {
        return max_col_value > 0;
    }

public:
    BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
        auto maxValuesSum = 0;
        attrsMaxValue = new int[attrs_count];
        attrsCount = attrs_count;
        bitIndexAttributeOffset = new int[attrs_count];
        auto bitOffset = 0;
        // attribute's max value is the same as number of bits used to encode the current value
        // e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
        for (int i = 0; i < attrs_count; ++i) {
            attrsMaxValue[i] = attrs_max_value[i];
            bitIndexAttributeOffset[i] = bitOffset;
            // col is indexed only if it's max value is > 0, -1 means
            if (!shouldColBeIndexed(attrs_max_value[i]))
                continue;
            maxValuesSum += attrs_max_value[i];
            bitOffset += attrs_max_value[i];
        }
        bitSize = (short) maxValuesSum;
        byteSize = BitString::getByteSizeFromBits(bitSize);
        mData = new char[byteSize * capacity];
        memset(mData, 0, byteSize * capacity);
        SelectMask = new char[byteSize];
        this->capacity = capacity;
    }

    ~BitmapIndex() {
        if (mData != nullptr) {
            delete[] mData;
            mData = nullptr;
            delete[] attrsMaxValue;
            attrsMaxValue = nullptr;

            delete[] SelectMask;
            SelectMask = nullptr;
        }
    }

    unsigned long getTotalByteSize() const {
        return byteSize * capacity;
    }

    // add record to index
    void addRecord(const char * record, const unsigned int * attribute_sizes) {
        auto indexRecord = getRowPointer(recordCount);
        unsigned int offset = 0;
        for (int j = 0; j < attrsCount; ++j) {
            if (attrsMaxValue[j] != -1) {
                // byte col value
                char colValue = *(record + offset);
                if (colValue > attrsMaxValue[j]) {
                    throw std::runtime_error("Col value is bigger than max allowed value!");
                }
//            printf("%d ", colValue);
                BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
            }
            offset += attribute_sizes[j];
        }
        recordCount += 1;
    }

    // SELECT COUNT(*)
    int Select(const char *query) const {
        uint64_t rowsFound = 0;
        memset(SelectMask, 0, byteSize);
        for (int col = 0; col < attrsCount; ++col) {
            if (!shouldColBeIndexed(attrsMaxValue[col])) {
                continue;
            }
            auto col_value = query[col];
            if (col_value < 0) {
                for (int i = 0; i < attrsMaxValue[col]; ++i) {
                    BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
                }
            } else {
                BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
            }
        }

        uint64_t maxBytesValue = 0;
        uint64_t byteVals = 0xff;
        for (int i = 0; i < byteSize; ++i) {
            maxBytesValue |= byteVals << (i * 8);
        }

        auto indexRecord = getRowPointer(0);
        for (int i = 0; i < recordCount; ++i) {
            rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
            indexRecord += byteSize;
        }
        return rowsFound;
    }
};


void generateRecord(
        char *record,
        const unsigned int attr_sizes[],
        const int attr_max_value[],
        int attr_count
    ) {
    auto offset = 0;
    for (int c = 0; c < attr_count; ++c) {
        if (attr_max_value[c] == -1) {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % 256;
            }
        } else {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % attr_max_value[c];
            }
        }
        offset += attr_sizes[c];
    }
}

int main() {
    TableSchema schema;
    const int attribute_count = 13;
    const int record_count = 1000000;
    // for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
    unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
    int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
    schema.attrs_count = attribute_count;
    schema.attr_sizes = attr_sizes;
    schema.attr_max_values = attr_max_values;
    schema.calculate_offsets();

    srand((unsigned ) time(nullptr));

    BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);

    char *record = new char[schema.record_size];
    for (int i = 0; i < record_count; ++i) {
        // generate some random records and add them to the index
        generateRecord(record, attr_sizes, attr_max_values, attribute_count);
        bitmapIndex.addRecord(record, attr_sizes);
    }

    char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
    // simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
    auto found = bitmapIndex.Select(query);

    printf("Query found: %d records\n", found);

    delete[] record;
    return 0;
}

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

甜警司 2025-01-26 13:50:52

如果记录大小为 8,GCC 和 Clang 都会自动向量化,例如:(希望有足够的代表性代表代码发生的实际上下文)

int count(char * indexRecord, const char * SelectMask, uint64_t maxVal)
{
    bool equals;
    uint64_t rowsFound = 0;
    // some arbitrary number of records
    for (int i = 0; i < 1000000; ++i) {
        equals = tequals(SelectMask, indexRecord, maxVal);
        rowsFound += equals;
        indexRecord += 8; // record size padded out to 8
    }
    return rowsFound;
}

它的重要部分,由 GCC 编译,看起来像这样:

.L4:
    vpand   ymm0, ymm2, YMMWORD PTR [rdi]
    add     rdi, 32
    vpcmpeqq        ymm0, ymm0, ymm3
    vpsubq  ymm1, ymm1, ymm0
    cmp     rax, rdi
    jne     .L4

还不错。它使用的想法与我手动使用的相同:vpand 带掩码的数据(按位逻辑的简化),将其与零进行比较,减去比较的结果(减去,因为 True 结果是用 -1) 表示,来自包装在向量中的 4 个计数器。四个单独的计数在循环后相加。

顺便说一句,请注意,我将 rowsFound 设为 uint64_t。这很重要。如果 rowsFound 不是 64 位,那么 Clang 和 GCC 都会非常努力地尽快缩小计数范围,这与好方法完全相反:这会在循环中花费更多指令,并且没有任何好处。如果计数最终打算是 32 位 int,则可以在循环后简单地缩小它的范围,这样做可能不仅便宜,而且实际上免费

使用 SIMD 内在函数手动编写与该代码等效的东西并不困难,这可以使代码不那么脆弱(它不会基于希望编译器会做正确的事情),但它不适用于非x86 平台不再是了。

如果记录应该是 7 字节,那就是一个更烦人的问题了。 GCC 放弃了,Clang 实际上继续进行自动向量化,但这并不好:8 字节加载都是单独完成的,然后将结果放在一个向量中,这完全是浪费时间。

当使用 SIMD 内在函数手动执行此操作时,主要问题是将 7 字节记录解包到 qword 通道中。 SSE4.1 版本可以使用 pshufbpshufb 来自 SSSE3,但 pcmpeqq 来自 SSE4.1,因此以 SSE4 为目标是有意义的。 1)做到这一点,很容易。 AVX2 版本可以在尝试加载的第一个记录之前 2 个字节开始加载,这样 256 位寄存器的两个 128 位半部之间的“分割”就落在两个记录之间。然后,vpshufb 无法将字节从一个 128 位一半移动到另一半,但仍然可以将字节移动到位,因为它们都不需要交叉到另一半。

例如,具有手动矢量化和 7 字节记录的 AVX2 版本可能如下所示。这需要在末尾和开头进行一些填充,或者在命中最后一条记录之前跳过第一个记录并结束并单独处理它们。未经测试,但它至少可以让您了解手动矢量化的代码如何工作。

int count(char * indexRecord, uint64_t SelectMask, uint64_t maxVal)
{
    __m256i mask = _mm256_set1_epi64x(~SelectMask & maxVal);
    __m256i count = _mm256_setzero_si256();
    __m256i zero = _mm256_setzero_si256();
    __m256i shufmask = _mm256_setr_epi8(2, 3, 4, 5, 6, 7, 8, -1, 9, 10, 11, 12, 13, 14, 15, -1, 0, 1, 2, 3, 4, 5, 6, -1, 7, 8, 9, 10, 11, 12, 13, -1);
    for (int i = 0; i < 1000000; ++i) {
        __m256i records = _mm256_loadu_si256((__m256i*)(indexRecord - 2));
        indexRecord += 7 * 4;
        records = _mm256_shuffle_epi8(records, shufmask);
        __m256i isZero = _mm256_cmpeq_epi64(_mm256_and_si256(records, mask), zero);
        count = _mm256_sub_epi64(count, isZero);
    }
    __m128i countA = _mm256_castsi256_si128(count);
    __m128i countB = _mm256_extracti128_si256(count, 1);
    countA = _mm_add_epi64(countA, countB);
    return _mm_cvtsi128_si64(countA) + _mm_extract_epi64(countA, 1);
}

If the record size was 8, both GCC and Clang would autovectorize, for example: (hopefully a sufficiently representative stand-in for your actual context in which the code occurs)

int count(char * indexRecord, const char * SelectMask, uint64_t maxVal)
{
    bool equals;
    uint64_t rowsFound = 0;
    // some arbitrary number of records
    for (int i = 0; i < 1000000; ++i) {
        equals = tequals(SelectMask, indexRecord, maxVal);
        rowsFound += equals;
        indexRecord += 8; // record size padded out to 8
    }
    return rowsFound;
}

The important part of it, as compiled by GCC, looks like this:

.L4:
    vpand   ymm0, ymm2, YMMWORD PTR [rdi]
    add     rdi, 32
    vpcmpeqq        ymm0, ymm0, ymm3
    vpsubq  ymm1, ymm1, ymm0
    cmp     rax, rdi
    jne     .L4

Not bad. It uses the same ideas that I would used manually: vpand the data with a mask (simplification of your bitwise logic), compare it to zero, subtract the results of the comparisons (subtract because a True result is indicated with -1) from 4 counters packed in a vector. The four separate counts are added after the loop.

By the way, note that I made rowsFound an uint64_t. That's important. If rowsFound is not 64-bit, then both Clang and GCC will try very hard to narrow the count ASAP, which is exactly the opposite of a good approach: that costs many more instructions in the loop, and has no benefit. If the count is intended to be a 32-bit int in the end, it can simply be narrowed after the loop, where it is probably not merely cheap but actually free to do that.

Something equivalent to that code would not be difficult to write manually with SIMD intrinsics, that could make the code less brittle (it wouldn't be based on hoping that compilers will do the right thing), but it wouldn't work for non-x86 platforms anymore.

If the records are supposed to be 7-byte, that's a more annoying problem to deal with. GCC gives up, Clang actually goes ahead with its auto-vectorization, but it's not good: the 8-byte loads are all done individually, the results then put together in a vector, which is all a big waste of time.

When doing it manually with SIMD intrinsics, the main problems would be unpacking the 7-byte records into qword lanes. An SSE4.1 version could use pshufb (pshufb is from SSSE3, but pcmpeqq is from SSE4.1 so it makes sense to target SSE4.1) to do this, easy. An AVX2 version could do a load that starts 2 bytes before the first record that it's trying to load, such that the "split" between the two 128-bit halves of the 256-bit registers falls between two records. Then vpshufb, which cannot move bytes from one 128-bit half to the other, can still move the bytes into place because none of them need to cross into the other half.

For example, an AVX2 version with manual vectorization and 7-byte records could look something like this. This requires either some padding at both the end and the start, or just skip the first record and end before hitting the last record and handle those separately. Not tested, but it would at least give you some idea of how code with manual vectorization would work.

int count(char * indexRecord, uint64_t SelectMask, uint64_t maxVal)
{
    __m256i mask = _mm256_set1_epi64x(~SelectMask & maxVal);
    __m256i count = _mm256_setzero_si256();
    __m256i zero = _mm256_setzero_si256();
    __m256i shufmask = _mm256_setr_epi8(2, 3, 4, 5, 6, 7, 8, -1, 9, 10, 11, 12, 13, 14, 15, -1, 0, 1, 2, 3, 4, 5, 6, -1, 7, 8, 9, 10, 11, 12, 13, -1);
    for (int i = 0; i < 1000000; ++i) {
        __m256i records = _mm256_loadu_si256((__m256i*)(indexRecord - 2));
        indexRecord += 7 * 4;
        records = _mm256_shuffle_epi8(records, shufmask);
        __m256i isZero = _mm256_cmpeq_epi64(_mm256_and_si256(records, mask), zero);
        count = _mm256_sub_epi64(count, isZero);
    }
    __m128i countA = _mm256_castsi256_si128(count);
    __m128i countB = _mm256_extracti128_si256(count, 1);
    countA = _mm_add_epi64(countA, countB);
    return _mm_cvtsi128_si64(countA) + _mm_extract_epi64(countA, 1);
}
空城缀染半城烟沙 2025-01-26 13:50:52

这是另一种方法。此代码不使用未对齐的加载技巧(如果将输入数据对齐 16 字节,则特别有价值),但总体上使用更多指令,因为更多的洗牌,并且仅对 16 字节 SSE 向量进行操作。

我不知道它与其他答案相比如何,可能更快或更慢。该代码需要 SSSE3 和 SSE 4.1 指令集。

// Load 7 bytes from memory into the vector
inline __m128i load7( const uint8_t* rsi )
{
    __m128i v = _mm_loadu_si32( rsi );
    v = _mm_insert_epi16( v, *(const uint16_t*)( rsi + 4 ), 2 );
    v = _mm_insert_epi8( v, rsi[ 6 ], 6 );
    return v;
}

// Prepare mask vector: broadcast the mask, and duplicate the high byte
inline __m128i loadMask( uint64_t mask )
{
    __m128i vec = _mm_cvtsi64_si128( (int64_t)mask );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    return _mm_shuffle_epi8( vec, perm );
}

// Prepare needle vector: load 7 bytes, duplicate 7-th byte into 8-th, duplicate 8-byte lanes
inline __m128i loadNeedle( const uint8_t* needlePointer, __m128i mask )
{
    __m128i vec = load7( needlePointer );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    vec = _mm_shuffle_epi8( vec, perm );
    return _mm_and_si128( vec, mask );
}

// Compare first 14 bytes with the needle, update the accumulator
inline void compare14( __m128i& acc, __m128i vec, __m128i needle, __m128i mask )
{
    // Shuffle the vector matching the needle and mask; this duplicates two last bytes of each 7-byte record
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10, 11, 12, 13, 13 );
    vec = _mm_shuffle_epi8( vec, perm );
    // bitwise AND with the mask
    vec = _mm_and_si128( vec, mask );
    // Compare 8-byte lanes for equality with the needle
    vec = _mm_cmpeq_epi64( vec, needle );
    // Increment the accumulator if comparison was true
    acc = _mm_sub_epi64( acc, vec );
}

size_t countRecords( const uint8_t* rsi, size_t count, const uint8_t* needlePointer, uint64_t maskValue )
{
    const __m128i mask = loadMask( maskValue );
    const __m128i needle = loadNeedle( needlePointer, mask );
    __m128i acc = _mm_setzero_si128();

    // An iteration of this loop consumes 16 records = 112 bytes = 7 SSE vectors
    const size_t countBlocks = count / 16;
    for( size_t i = 0; i < countBlocks; i++ )
    {
        const __m128i* p = ( const __m128i* )rsi;
        rsi += 7 * 16;

        __m128i a = _mm_loadu_si128( p );
        compare14( acc, a, needle, mask );

        __m128i b = _mm_loadu_si128( p + 1 );
        compare14( acc, _mm_alignr_epi8( b, a, 14 ), needle, mask );

        a = _mm_loadu_si128( p + 2 );
        compare14( acc, _mm_alignr_epi8( a, b, 12 ), needle, mask );

        b = _mm_loadu_si128( p + 3 );
        compare14( acc, _mm_alignr_epi8( b, a, 10 ), needle, mask );

        a = _mm_loadu_si128( p + 4 );
        compare14( acc, _mm_alignr_epi8( a, b, 8 ), needle, mask );

        b = _mm_loadu_si128( p + 5 );
        compare14( acc, _mm_alignr_epi8( b, a, 6 ), needle, mask );

        a = _mm_loadu_si128( p + 6 );
        compare14( acc, _mm_alignr_epi8( a, b, 4 ), needle, mask );
        compare14( acc, _mm_srli_si128( a, 2 ), needle, mask );
    }

    // Sum high / low lanes of the accumulator
    acc = _mm_add_epi64( acc, _mm_srli_si128( acc, 8 ) );

    // Handle the remainder, 7 bytes per iteration
    // Compared to your 6M records, the remainder is small, the performance doesn't matter much.
    for( size_t i = 0; i < count % 16; i++ )
    {
        __m128i a = load7( rsi );
        rsi += 7;
        compare14( acc, a, needle, mask );
    }

    return (size_t)_mm_cvtsi128_si64( acc );
}

PS 此外,尽管 RAM 带宽开销为 15%,但我预计 8 字节索引会更快。尤其是在矢量化为 AVX2 时。

Here’s another approach. This code doesn’t use unaligned load tricks (especially valuable if you align your input data by 16 bytes), but uses more instructions overall because more shuffles, and only operates on 16-byte SSE vectors.

I have no idea how it compares to the other answers, may be either faster or slower. The code requires SSSE3 and SSE 4.1 instructions sets.

// Load 7 bytes from memory into the vector
inline __m128i load7( const uint8_t* rsi )
{
    __m128i v = _mm_loadu_si32( rsi );
    v = _mm_insert_epi16( v, *(const uint16_t*)( rsi + 4 ), 2 );
    v = _mm_insert_epi8( v, rsi[ 6 ], 6 );
    return v;
}

// Prepare mask vector: broadcast the mask, and duplicate the high byte
inline __m128i loadMask( uint64_t mask )
{
    __m128i vec = _mm_cvtsi64_si128( (int64_t)mask );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    return _mm_shuffle_epi8( vec, perm );
}

// Prepare needle vector: load 7 bytes, duplicate 7-th byte into 8-th, duplicate 8-byte lanes
inline __m128i loadNeedle( const uint8_t* needlePointer, __m128i mask )
{
    __m128i vec = load7( needlePointer );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    vec = _mm_shuffle_epi8( vec, perm );
    return _mm_and_si128( vec, mask );
}

// Compare first 14 bytes with the needle, update the accumulator
inline void compare14( __m128i& acc, __m128i vec, __m128i needle, __m128i mask )
{
    // Shuffle the vector matching the needle and mask; this duplicates two last bytes of each 7-byte record
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10, 11, 12, 13, 13 );
    vec = _mm_shuffle_epi8( vec, perm );
    // bitwise AND with the mask
    vec = _mm_and_si128( vec, mask );
    // Compare 8-byte lanes for equality with the needle
    vec = _mm_cmpeq_epi64( vec, needle );
    // Increment the accumulator if comparison was true
    acc = _mm_sub_epi64( acc, vec );
}

size_t countRecords( const uint8_t* rsi, size_t count, const uint8_t* needlePointer, uint64_t maskValue )
{
    const __m128i mask = loadMask( maskValue );
    const __m128i needle = loadNeedle( needlePointer, mask );
    __m128i acc = _mm_setzero_si128();

    // An iteration of this loop consumes 16 records = 112 bytes = 7 SSE vectors
    const size_t countBlocks = count / 16;
    for( size_t i = 0; i < countBlocks; i++ )
    {
        const __m128i* p = ( const __m128i* )rsi;
        rsi += 7 * 16;

        __m128i a = _mm_loadu_si128( p );
        compare14( acc, a, needle, mask );

        __m128i b = _mm_loadu_si128( p + 1 );
        compare14( acc, _mm_alignr_epi8( b, a, 14 ), needle, mask );

        a = _mm_loadu_si128( p + 2 );
        compare14( acc, _mm_alignr_epi8( a, b, 12 ), needle, mask );

        b = _mm_loadu_si128( p + 3 );
        compare14( acc, _mm_alignr_epi8( b, a, 10 ), needle, mask );

        a = _mm_loadu_si128( p + 4 );
        compare14( acc, _mm_alignr_epi8( a, b, 8 ), needle, mask );

        b = _mm_loadu_si128( p + 5 );
        compare14( acc, _mm_alignr_epi8( b, a, 6 ), needle, mask );

        a = _mm_loadu_si128( p + 6 );
        compare14( acc, _mm_alignr_epi8( a, b, 4 ), needle, mask );
        compare14( acc, _mm_srli_si128( a, 2 ), needle, mask );
    }

    // Sum high / low lanes of the accumulator
    acc = _mm_add_epi64( acc, _mm_srli_si128( acc, 8 ) );

    // Handle the remainder, 7 bytes per iteration
    // Compared to your 6M records, the remainder is small, the performance doesn't matter much.
    for( size_t i = 0; i < count % 16; i++ )
    {
        __m128i a = load7( rsi );
        rsi += 7;
        compare14( acc, a, needle, mask );
    }

    return (size_t)_mm_cvtsi128_si64( acc );
}

P.S. Also, I would expect 8-byte indices to be faster despite the 15% RAM bandwidth overhead. Especially when vectorizing into AVX2.

触ぅ动初心 2025-01-26 13:50:52

首先,您的代码不是完整的示例。您缺少许多变量的定义和类型,这使得很难回答。您也没有指出您正在哪个平台上进行编译。

以下是矢量化可能失败的原因:

  • 您的读取重叠!您以 7 字节间隔读取 8 字节。仅此一点就可能会混淆矢量化逻辑。
  • 您的指针可能不会被__restrict'ed,这意味着编译器必须假设它们可能是别名,这意味着它可能需要在每次访问时从该地址重新读取。
  • 您的equals()函数指针参数绝对不是__restrict'ed(尽管编译器可以通过内联看到它)。
  • 结盟。 x86_64 处理器不需要对齐访问,但在某些平台上,一些较大的指令需要知道它们在内存中正确对齐的位置上工作。此外,正如 @PeterCordes 在评论中指出的那样,编译器和库在对齐方面可能比硬件更挑剔。
  • 为什么不将 *SelectMask 放入局部变量中?

First, your code is not a complete example. You're missing definitions and types of many variables, which makes it difficult to answer. You also did not indicate which platform you're compiling on/for.

Here are reasons why vectorization might fail:

  • Your reads are overlapping! you're reading 8 bytes at 7-byte intervals. That alone might confuse the vectorization logic.
  • Your pointers may not be __restrict'ed, meaning that the compiler must assume they might alias, meaning that it might need to reread from the address on every access.
  • Your equals() function pointer parameters are definitely not __restrict'ed (although the compiler could be seeing through that with inlining).
  • Alignment. x86_64 processors do not require aligned accesses, but on some platforms, some larger instructions need to know they work on properly aligned places in memory. Moreover, as @PeterCordes points out in a comment, compilers and libraries may be more picky than the hardware regarding alignment.
  • Why don't you put *SelectMask in a local variable?
~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文