在 ClickHouse 中如何实现矢量化执行

发布于 2024-12-27 12:41:02 字数 17657 浏览 4 评论 0

ClickHouse 是一个性能卓越的 OLAP 计算引擎,自开源以来,受到了业内的广泛青睐。而 ClickHouse 极致性能背后的关键因素是 真正的列式存储极致性能的向量化执行引擎

向量化简介

什么是向量化

向量化是提升 OLAP 查询性能的一种常用技术。向量化是指对不同的数据执行同样的一个或一批指令,或者说把指令应用于一个数组/向量,通过 CPU 数据并行,实现单指令、多数据(简称 SIMD)。

通俗地说,对一个数组进行连续操作,即可看作向量化:

从 CPU 流水线角度来看,向量化能够充分填满 CPU 的计算单元,其本质是采用一个控制器来控制多个处理器,同时对一组数据中的每一条执行相同的操作,实现空间上的并行:

  • 单指令流:同时只能执行一种操作;
  • 多数据流:在一组同构(向量)的数据上进行操作;

向量化依赖于 CPU 提供的硬件支持,例如,Intel CPU 提供了一系列的 SSE、AXV 扩展向量化指令集,在 Linux 命令行执行 cat /proc/cpuinfo ,其输出包含如下内容:

其中,sse*,avx* 即为该 CPU 支持的向量化指令集。

向量化的局限

向量化不是十全十美的,其主要被用来优化可并行计算的简单场景,以及可能被频繁调用的基础逻辑,并非所有场景都适合于向量化。在严重依赖于控制流,即包含大量分支、跳转和条件判断语句的任务中,则难以实现向量化。

例如,向量化不能以不同的方式来处理不同的数据:

如何实现向量化

在程序中实现向量化的方式多种多样,包括:

  • 嵌入式汇编:直接在程序中嵌入向量化汇编指令
  • 使用 Intel 提供的 SIMD 内置函数
  • 通过 Cilk 库提供的 for 循环
  • 通过编译器制导语句,如 #pragma simd
  • 编译器自动向量化
  • 使用高性能库,如 IPPMKL

在上述方式中,从上往下,易用性越高,可控性越低。其中,自动向量化和内置函数在 通用性易用性 方面更好,因此也是 OLAP 系统使用最为广泛的两种向量化技术。

示例

例如,使用 SSE __mm_add_ps 内置函数,一次实现 8 个单精度浮点数的加法:

int main()
{
	__m128 v0 = _mm_set_ps(1.0f, 2.0f, 3.0f, 4.0f);
	__m128 v1 = _mm_set_ps(1.0f, 2.0f, 3.0f, 4.0f);
	__m128 v2 = _mm_add_ps(v0, v1);
	float result[4];
	_mm_storeu_ps(result, v2);
}

在上述代码中,首先通过 _mm_set_ps 函数创建两个包含 4 个浮点数的 128 位向量,之后,通过 _mm_add_ps 来完成两个向量的加法,通过三条向量化指令,即完成了 8 个浮点数的相加。

另外,对于向量化友好的代码,经过编译器编译后,能够自动使用向量化指令进行优化:

上述数组求和的代码经过编译器编译优化后生成的汇编代码如上图右半部分所示,其中使用了大量的向量化指令, xmm 系列寄存器均为 128 位。

ClickHouse 向量化执行

在 ClickHouse 中,向量化的实现主要通过 SIMD 内置函数编译器自动向量化

  • 大量使用 SIMD 内置函数对关键代码路径进行优化
  • 通过良好的架构设计和代码设计,使得编译器能够生成良好的向量化代码。 关键:基于 Pipeline 的执行引擎设计,能够按列对数据进行处理。

SIMD 内置函数

在 ClickHouse 中, 大量使用了 SIMD 内置函数 ,来对关键代码路径进行优化。

例如,在 ClickHouse 底层,过滤器(Filter)是一个预分配空间、无符号 8 位整型数的数组,用于表示 WHEREHAVING 子句的条件及真值,每一位的值为 0 或者 1,表示条件为假或者真。

下面的代码计算 Filter 中 1 的个数,当环境支持 SSE2 指令,并且硬件支持 POPCNT 指令时,会通过向量化指令一次将 64 字节压缩成一个 64 位的无符号数,然后通过 __builtin_popcountll 指令计算出 64 位无符号数中包含的 1 的个数。

size_t countBytesInFilter(const UInt8 * filt, size_t sz)
{
	size_t count = 0;
	const Int8 * pos = reinterpret_cast<const Int8 *>(filt);
	const Int8 * end = pos + sz;

#if defined(__SSE2__) && defined(__POPCNT__)
	const Int8 * end64 = pos + sz / 64 * 64;
	for (; pos < end64; pos += 64)
		count += __builtin_popcountll(toBits64(pos));
#endif
	for (; pos < end; ++pos)
		count += *pos != 0;
	return count;
}

toBits64 函数完成 64 字节数到 64 位无符号数的压缩操作,其中使用到的 SIMD 内置函数包括:

  • _mm_setzero_si128() :初始化 128 位(16 字节)的全 0 位图;
  • _mm_loadu_si128(mem_addr) :从内存地址 mem_addr 处加载 128 位的整型数据;
  • _mm_cmpgt_epi8(a, b) :按 8 位比较 a 和 b 两个 128 位整型数,若 a 的对应 8 位比 b 的对应 8 位大,则填充对应位为全 1,否则填充全 0;
  • _mm_movemask_epi8(a) :根据 128 位整型数 a 的每个 8 位组的最高位创建掩码,一共 16 位长,返回 int 结果(高 16 位用 0 填充)。

在该函数中,每调用一次内置函数能够处理 16 字节的数据,刚好 128 位,从而实现向量化计算。由于 SSE 指令集中没有真正的位运算指令,所以压缩的过程略显繁琐,但仍然比逐个遍历判断效率要高很多。

static UInt64 toBits64(const Int8 * bytes64)
{
    static const __m128i zero16 = _mm_setzero_si128();
    UInt64 res =
        static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(
		    _mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64)), zero16)))
        | (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(
		    _mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 16)), zero16))) << 16)
        | (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(
		    _mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 32)), zero16))) << 32)
        | (static_cast<UInt64>(_mm_movemask_epi8(_mm_cmpeq_epi8(
		    _mm_loadu_si128(reinterpret_cast<const __m128i *>(bytes64 + 48)), zero16))) << 48);
    return ~res;
}

编译器自动向量化

编译器自动向量化是 ClickHouse 中实现向量化的最主要的方式,其关键是基于 Pipeline 的执行引擎,能够按列对数据进行处理。关于 Pipeline 执行引擎,会在下一节进行介绍。

在实现自动向量化时,ClickHouse 也通过许多方式对代码进行优化,从而提高向量化能力:

  • 代码设计中大量使用模板(template),对数据的类型和长度进行分派(dispatch): 消除分支跳转语句
  • 大量使用内联函数: 消除函数调用
  • 减少虚函数调用

下面,我们来看 ClickHouse 中 plus 这一个 UDF 的实现,该函数作用在两个列中,按位置完成两列数据的相加,例如,当我们输入 SQL SELECT a + b FROM t; 时,会被转换成 SELECT plus(a, b) FROM t; ,从而通过这一 UDF 来完成计算。

下面的代码是 plus 函数真正执行计算的代码,可以看出, process 函数的实现代码是非常完美的向量化优化代码:通过使用模板对类型和长度进行分派,在执行时,数组的类型和长度已知,没有任何分支跳转语句,同时 apply 函数已经被内联,因此不会发生函数调用。

template <typename A, typename B, typename Op, typename OpResultType = typename Op::ResultType>
struct BinaryOperation
{
    using ResultType = OpResultType;
    static const constexpr bool allow_fixed_string = false;

    template <OpCase op_case>
    static void NO_INLINE process(const A * __restrict a, const B * __restrict b, ResultType * __restrict c, size_t size)
    {
        for (size_t i = 0; i < size; ++i)
            if constexpr (op_case == OpCase::Vector)
                c[i] = Op::template apply<ResultType>(a[i], b[i]);
            else if constexpr (op_case == OpCase::LeftConstant)
                c[i] = Op::template apply<ResultType>(*a, b[i]);
            else
                c[i] = Op::template apply<ResultType>(a[i], *b);
    }

    static ResultType process(A a, B b) { return Op::template apply<ResultType>(a, b); }
};

template <typename Result = ResultType>
static inline NO_SANITIZE_UNDEFINED Result apply(A a, B b)
{
    /// Next everywhere, static_cast - so that there is no wrong result in expressions of the form Int64 c = UInt32(a) * Int32(-1).
    if constexpr (is_big_int_v<A> || is_big_int_v<B>)
    {
        using CastA = std::conditional_t<std::is_floating_point_v<B>, B, A>;
        using CastB = std::conditional_t<std::is_floating_point_v<A>, A, B>;

        return static_cast<Result>(static_cast<CastA>(a)) + static_cast<Result>(static_cast<CastB>(b));
    }
    else
        return static_cast<Result>(a) + b;
}

另外,值得一提的是 __restrict 修饰符,该关键字的含义是向编译器表明,在该指针的生命周期内,只有该指针本身或直接由它产生的指针能够用来访问该指针指向的对象,其作用是限制指针别名,从而帮助编译器进行优化。在 ClickHouse 中,有很多地方都使用到了该关键字,在 PR9304 中,通过该关键字,使查询的整体性能提升从 5% 到 200% 不等。

在 C++中, 消除虚函数调用也是提高向量化的一个重要手段 为例。在 ClickHouse 中,聚合函数能够把数据聚合到中间状态,聚合函数的接口类实现了 addBatch 方法,其会通过一个 for 循环调用 add 方法将一批数据聚合到中间状态,派生类需要实现 add 方法,这导致了虚函数的调用,无法实现 for 循环的向量化执行,通过消除 countIf 聚合函数的 addBatch 方法对 add 方法的虚函数调用,从而实现向量化执行,性能得到数倍提升。

Pipeline 执行引擎

Pipeline 执行引擎是 ClickHouse 实现向量化的关键。

传统火山模型 vs Pipeline 模型

在传统火山模型中,SQL 查询在数据库中经过解析,会生成一个查询树,查询树的每个节点为代数运算符(Operator)。火山模型把 Operator 看成迭代器,每个迭代器都会提供一个 next() 接口,该接口会递归调用子节点的 next() 接口获取一行数据,即一个 tuple,对这行数据进行特定处理,然后返回处理后的 tuple。

对于这样 Tupl-at-a-time 的处理逻辑,其优点是 处理逻辑清晰、简单 ,缺点是 以行为单位处理数据,CPU Cache 不友好,每获取一行数据都会涉及到大量的虚函数调用开销,CPU 利用率不高,无法发挥向量化能力

Pipeline 执行模型以 Block 为单位,按列对数据进行处理,易于实现向量化。ClickHouse 在执行过程中,数据按列(数组)的方式组织在内存中,查询执行的相关操作和函数都是基于向量进行调度执行,而不是在单个值上进行调用,提高了 CPU 缓存利用率,利于 CPU 的自动向量化编译优化。

Pipeline 执行引擎实现

在介绍 Pipeline 执行引擎实现之前,先简单看一下在 ClickHouse 中 SQL 的执行流程。

当一个 SQL 查询语句提交到 ClickHouse 中以后,首先会经过 Parser 解析成一棵抽象语法树(AST)。之后,Interpreter 会对 AST 进行一系列改写和优化,比如在逻辑相等时, sumIf 函数改写为性能更优的 countIf 函数 ,然后生成 QueryPlan,QueryPlan 是由 QueryStep 构成的一个树(除 Join 外,实际退化为链表),可近似将 QueryPlan 看成是物理执行计划,在 QueryPlan 的基础上,能够进行 Pipeline 级别的优化。最后,基于优化后的 QueryPlan 创建 QueryPipeline,由 Executor 完成 QueryPipeline 的调度执行。

QueryPipeline 即为本节所说的 Pipeline。

基本结构

Pipeline 的基本结构是一个有向无环图,如下图所示。其中,节点是 Processor,边是一对连通的端口,上一个 Processor 的输出端口连接到下一个 Processor 的输入端口。

  • Processor:Pipeline 的基本构建块,对数据进行处理的单元,例如过滤、排序、聚合等;
  • Port:连接 Processor,实现数据在 Pipeline 之间的流通,能够保存一个 Block 的数据。

一个 Processor 可能有 0 个或多个输入端口,以及 0 个或多个输出端口,其中,Source 和 Sink 是两类特殊的 Processor,Source 只有一个输出端口,没有输入端口,Sink 只有一个输入端口,没有输出端口。在 Pipeline 执行时,Processor 会从输入端口拉取数据,对数据进行处理,然后推到输出端口

下面,我们以一个具体的例子来看 ClickHouse 中 Pipeline 的结构是什么样的,例如,对于 SQL:

SELECT avg(length(URL)) + 1 FROM hits WHERE URL != '';

其对应的 Pipeline 如下图所示:

一个完整的 Pipeline 总是从 Source 开始,到 Sink 结束,在该 Pipeline 中,Read 对数据进行读取,之后 Filter 对数据进行过滤,Expression 进行表达式计算…,直到最后处理完成的数据输出到 Sink。

调度执行

业内常见的数据库计算模式如传统火山模型是基于算子间函数调用链来完成 SQL 计算的,预先分配线程与算子调用链进行绑定,如下图左侧所示,在执行过程中数据流与控制流绑定,无法灵活调整不同算子的并发,遇到阻塞事件也无法让出执行资源。

ClickHouse Pipeline 将数据流与控制流分离,所有执行线程归属于独立线程池,每个 Processor 可独立运行。执行过程中,Executor 将选出当前可执行的 Processor 进入待执行队列,由线程池中的工作线程进行运算处理。不仅 Processor 间是异步调度的,对资源依赖型 Processor(如 IO,RPC 等),其内部运行过程同样支持异步调度。当这类 Processor 的执行过程中遇到其依赖资源未就绪时,将转变为异步状态,中断当前执行并加入到异步等待队列中。Executor 在收集可执行 Processor 的过程中,同时将对异步等待队列中所有 Processor 进行 Epoll 轮询,把就绪的 Processor 转变为可执行状态,加入到待执行队列中。

动态扩展

ClickHouse Pipeline 具有极高的灵活性,能够在执行的过程中进行动态更改,创建新的 Processor,例如聚合的执行。

在 ClickHouse 中,聚合(Aggregating)的执行分为两个阶段:

  • consume 阶段:并行从所有输入流读取数据并预聚合到中间状态;
  • generate 阶段:合并预聚合的数据生成最终的聚合结果。

下图是 Aggregating 的执行示意图,左边为进行预聚合的 consume 阶段,右边为生成最终结果的 generate 阶段。在 consume 阶段,每一个 AggregatingTransform 不停从对应的输入流中拉取数据,预聚合到中间状态并写入对应的哈希表中;最后一个消费完输入流的 AggregatingTransform 会负责 Pipeline 的动态更改,其会创建一个新的 Processor:ConvertAggregatedToChunks,该 Processor 会从前面写入预聚合数据的所有哈希表中读取数据,并进行合并(Merge),生成最终的聚合结果。

表达式执行

表达式求值是 ClickHouse Pipeline 中的一个重要组成部分,ClickHouse 通过 ActionsDAG 来实现表达式计算。

ActionsDAG 是一个由表达式构成的有向无环图,其中的节点有下面几种类型:

  • INPUT:输入列
  • COLUMN:常量列
  • ALIAS:列的别名
  • ARRAY_JOIN:arrayJoin 函数
  • FUNCTION:其他普通函数

ActionsDAG 的边表示的是节点表达式之间的依赖推导关系。基于 DAG,方便对 Action 进行优化,例如:

  • 删除不需要的表达式
  • 字表达式编译
  • 节点拆分或合并

ActionsDAG 在执行时,会首先进行拓扑排序,得到表达式执行的序列,然后该表达式序列作用到 Block 上,实现按列进行计算。

ActionsDAG 相比 Greenplum,Doris,MySQL,PostgreSQL 等数据库具有一个核心优势:复用中间结果。对于 SELECT (a + b) * c, (a + b) / c FROM t; 这样一个查询,ActionsDAG 可以做到只对 a + b 求值一次。其他数据库无法复用结果的原因是使用树状结构进行表达式求值,实现简单但不灵活,ActionsDAG 通过 DAG 结构对表达式进行血缘推导,能够实现子节点复用。

在 ClickHouse 中查看执行 Pipeline 和 ActionsDAG

在 ClickHouse 中,可以使用 EXPLAIN 查询来查看一个 SQL 对应的执行 Pipeline 和 ActionsDAG。

例如,对于前面的查询:

SELECT avg(length(URL)) + 1 FROM hits WHERE URL != '';

在 ClickHouse 客户端通过 EXPLAIN 查询来查看对应的执行 Pipeline:

EXPLAIN PIPELINE
SELECT avg(length(URL)) + 1
FROM hits
WHERE URL != ''

Query id: d0844f55-fe5b-4030-a762-402b4f5c843e

┌─explain──────────────────────────────────┐
│ (Expression)                             │
│ ExpressionTransform                      │
│   (Aggregating)                          │
│   Resize 32 → 1                          │
│     AggregatingTransform × 32            │
│       StrictResize 32 → 32               │
│         (Expression)                     │
│         ExpressionTransform × 32         │
│           (Filter)                       │
│           FilterTransform × 32           │
│             (SettingQuotaAndLimits)      │
│               (ReadFromMergeTree)        │
│               MergeTreeThread × 32 0 → 1 │
└──────────────────────────────────────────┘

13 rows in set. Elapsed: 0.005 sec.

可以看到,在该查询的 Pipeline 中,最开始是从磁盘扫描数据的 ReadFromMergeTree,并行度为 32;之后是 FilterTransform -> ExpressionTransform -> AggregatingTransform,聚合完成之后,所有流合并成为一个流,并行度从 32 变为 1,完成最后的 ExpressionTransform。

下面,通过 EXPLAIN 来查看 ActionsDAG:

EXPLAIN actions = 1
SELECT avg(length(URL)) + 1
FROM hits
WHERE URL != ''

Query id: 6cae6628-8e14-4661-a86d-7e94aff31322

┌─explain────────────────────────────────────────────────────────────────────────────────────────┐
│ Expression ((Projection + Before ORDER BY))                                                    │
│ Actions: INPUT : 0 -> avg(length(URL)) Float64 : 0                                             │
│          COLUMN Const(UInt8) -> 1 UInt8 : 1                                                    │
│          FUNCTION plus(avg(length(URL)) :: 0, 1 :: 1) -> plus(avg(length(URL)), 1) Float64 : 2 │
│ Positions: 2                                                                                   │
│   Aggregating                                                                                  │
│   Keys:                                                                                        │
│   Aggregates:                                                                                  │
│       avg(length(URL))                                                                         │
│         Function: avg(UInt64) → Float64                                                        │
│         Arguments: length(URL)                                                                 │
│         Argument positions: 0                                                                  │
│     Expression (Before GROUP BY)                                                               │
│     Actions: INPUT : 0 -> URL String : 0                                                       │
│              FUNCTION length(URL :: 0) -> length(URL) UInt64 : 1                               │
│     Positions: 1                                                                               │
│       Filter (WHERE)                                                                           │
│       Filter column: notEquals(URL, '') (removed)                                              │
│       Actions: INPUT : 0 -> URL String : 0                                                     │
│                COLUMN Const(String) -> '' String : 1                                           │
│                FUNCTION notEquals(URL : 0, '' :: 1) -> notEquals(URL, '') UInt8 : 2            │
│       Positions: 0 2                                                                           │
│         SettingQuotaAndLimits (Set limits and quota after reading from storage)                │
│           ReadFromMergeTree                                                                    │
│           ReadType: Default                                                                    │
│           Parts: 1                                                                             │
│           Granules: 122071                                                                     │
└────────────────────────────────────────────────────────────────────────────────────────────────┘

27 rows in set. Elapsed: 0.002 sec.

可以看到,在该查询中,包含三个 ActionsDAG,最下面的 ActionsDAG 是进行过滤时使用到的,中间的 ActionsDAG 计算 length(URL) ,最后的 ActionsDAG 在聚合执行完之后执行,计算 avg(length(URL)) + 1

在输出中能够清晰的看出 ActionsDAG 中每一个列在 Block 中的位置, Action 之间的依赖关系,以及每一个 ActionsDAG 执行完成后返回的列等信息。以最下面的 ActionsDAG 为例,其包含三个节点:第一个节点为 INPUT 的 URL 列,位于 Block 中索引为 0 的位置;第二个节点为 COLUMN Const ,值为空字符串,位于 Block 中索引为 1 的位置;第三个节点为 FUNCION notEqual ,该节点依赖于前面两个节点,前面两个节点是该节点的输入,其输出为 UInt8 类型,位于 Block 中索引为 2 的位置。最后,该 ActionsDAG 会返回 Block 中索引为 0 和 2 的列,即 Positions: 0 2

总结

本文详细介绍了 ClickHouse 向量化执行引擎的实现。大量使用 SIMD 内置函数,以及高效的 Pipeline 执行引擎设计与实现,是 ClickHouse 拥有卓越查询性能的核心因素。

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

绮烟

暂无简介

文章
评论
27 人气
更多

推荐作者

微信用户

文章 0 评论 0

小情绪

文章 0 评论 0

ゞ记忆︶ㄣ

文章 0 评论 0

笨死的猪

文章 0 评论 0

彭明超

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文