目录

TVM: 1D convolution CPU Optimization

本博客使用claude-3.7-sonet翻译,如有冲突请优先参考英文原文

这篇博客介绍了使用 TVM 对 CPU conv1D 逐步进行优化的例子,包含 parallelization, loop tiling, vectorization, unrolling 等技巧。

环境配置:

Google colab CPU环境。

架构:                   x86_64
  CPU运行模式:           32-bit, 64-bit
  地址大小:              46位物理地址, 48位虚拟地址
  字节序:                小端序
CPU数量:                 2
  在线CPU列表:           0,1
厂商ID:                  GenuineIntel
  型号名称:              Intel(R) Xeon(R) CPU @ 2.00GHz
    CPU家族:             6
    型号:                85
    每核心线程数:         2
    每插槽核心数:         1
    插槽数:              1

我们基于以下设置进行性能测试:

M = 4096
N = 128
dtype = "float32"
np.random.seed(0)
a_np = np.random.rand(M).astype(dtype)
w_np = np.random.rand(N).astype(dtype)
ref = np.convolve(a_np, w_np)

这个实现创建了一个大小为(M + N - 1)的规约轴,并在if_then_else内使用边界检查。运行时间为24.3525 ms

# 朴素基准
def make_conv1d_cpu_scheduler_naive(M, N):
    A = te.placeholder((M,), name="A")  # 输入张量占位符
    W = te.placeholder((N,), name="W")  # 权重张量占位符

    k = te.reduce_axis((0, M + N - 1), "k")   # k在[0, M+N-1)范围内
    B = te.compute(
        (M + N - 1,),   # 输出形状,n从(0, M + N - 1)
        # if_then_else: 如果满足"任意"条件,返回0,否则返回A[k] * W[n - k]
        lambda n: te.sum(tvm.tir.if_then_else(
            tvm.tir.any(k < 0, k >= M, n - k < 0, n - k >= N),
            tvm.tir.const(0.0, "float32"),
            A[k] * W[n - k]), axis=k),
        name="B",
    )
    s = te.create_schedule(B.op)
    print("=" * 100)
    print(tvm.lower(s, [A, W, B], simple_mode=True))
    print("=" * 100)

    return s, A, W, B

IR:

def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
    T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
    for n in range(4223):
        B[n] = T.float32(0)
        for k in range(4223):
            cse_var_1: T.int32 = n - k  # W的位置
            # 如果 4096 <= k 或 cse_var_1 < 0 或 128 <= cse_var_1: += 0
            # 否则: += A[k] * W[cse_var_1]
            B[n] = B[n] + T.if_then_else(4096 <= k or cse_var_1 < 0 or 128 <= cse_var_1, T.float32(0), A[k] * W[cse_var_1])

这是计算一维卷积操作的最简单循环。

这里我们将规约轴限制在[0, M)范围内,消除了一些不必要的检查。这略微提高了速度至23.0471 ms

代码:

# 优化v0:缩小k的范围以减少if-else
def make_conv1d_cpu_scheduler_v0(M, N):
    A = te.placeholder((M,), name="A", dtype="float32")
    W = te.placeholder((N,), name="W", dtype="float32")

    k = te.reduce_axis((0, M), "k")   # k在[0, M)范围内
    B = te.compute(
        (M + N - 1,),
        lambda n: te.sum(tvm.tir.if_then_else(
            tvm.tir.any(k < 0, k >= M, n - k < 0, n - k >= N),
            tvm.tir.const(0.0, "float32"),
            A[k] * W[n - k]), axis=k),
        name="B",
    )

    s = te.create_schedule(B.op)

    return s, A, W, B

IR:

def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
  T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
  for n in range(4223):
      B[n] = T.float32(0)
      for k in range(4096):
          cse_var_1: T.int32 = n - k
          # 如果 cse_var_1 < 0 或 128 <= cse_var_1: += 0
          # 否则: += A[k] * W[cse_var_1]
          B[n] = B[n] + T.if_then_else(cse_var_1 < 0 or 128 <= cse_var_1, T.float32(0), A[k] * W[cse_var_1])

我们可以看到k的内循环缩小为for k in range(4096),并且我们也移除了冗余的4096 <= k条件。

我们对外循环使用parallel()。每个输出条目(B[n])可以独立计算,现在的结果是22.9158 ms

代码:

# 优化v1: v0 + 并行
def make_conv1d_cpu_scheduler_v1(M, N):
    s, A, W, B = make_conv1d_cpu_scheduler_v0(M, N, False)
    n_axis = B.op.axis[0]   # 输出轴
    s[B].parallel(n_axis)   # 对输出轴并行

    return s, A, W, B

IR:

@T.prim_func
def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
    T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
    for n in T.parallel(4223):
        B[n] = T.float32(0)
        for k in range(4096):
            cse_var_1: T.int32 = n - k
            B[n] = B[n] + T.if_then_else(cse_var_1 < 0 or 128 <= cse_var_1, T.float32(0), A[k] * W[cse_var_1])

值得注意的是,我们使用T.parallel(4223)来并行计算。通常这样做可以获得很大的性能提升。

注意:在colab上,我们只有一个CPU核心和每核两个线程,所以这种并行效果并不是很好。

我们进一步拆分输出轴,并对内部部分应用vectorize以利用SIMD指令。这带来了显著的加速,达到16.0384 ms

代码:

# 优化v2: v0 + 并行 + 拆分 + 向量化
def make_conv1d_cpu_scheduler_v2(M, N, factor=8):
    s, A, W, B = make_conv1d_cpu_scheduler_v0(M, N, False)
    n_axis = B.op.axis[0]
    # AVX2,向量化带宽=256。8 * float32 或 16 * float16
    outer, inner = s[B].split(n_axis, factor=factor)
    s[B].parallel(outer)
    s[B].vectorize(inner)   # 使用CPU SIMD

    return s, A, W, B

IR:

@T.prim_func
def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
    T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
    for n_outer in T.parallel(528):
        for n_inner_s in range(8):
            if T.likely(n_outer * 8 + n_inner_s < 4223):
                B[n_outer * 8 + n_inner_s] = T.float32(0)
        for k, n_inner_s in T.grid(4096, 8):
            if T.likely(n_outer * 8 + n_inner_s < 4223):
                cse_var_2: T.int32 = n_outer * 8 + n_inner_s  # 输出位置
                cse_var_1: T.int32 = cse_var_2 - k
                # 如果 cse_var_1 < 0 或 128 <= cse_var_1: += 0
                # 否则: += A[k] * W[cse_var_1]
                B[cse_var_2] = B[cse_var_2] + T.if_then_else(cse_var_1 < 0 or 128 <= cse_var_1, T.float32(0), A[k] * W[cse_var_1])

最重要的优化是将循环从T.parallel(4223)改为for n_outer in T.parallel(528)for n_inner_s in range(8),现在我们可以更好地:

  • 命中缓存
  • 减少线程开销
  • 使用CPU的SIMD技术(编译器隐式实现)

T.likely(n_outer * 8 + n_inner_s < 4223)也向编译器暗示对分支进行更多优化,使其性能更好(编译器分支预测)。

我们还拆分了规约轴k并展开内部部分以减少循环开销。这帮助我们达到了14.5967 ms

代码:

# 优化v3: v2 + k轴拆分 + 展开
def make_conv1d_cpu_scheduler_v3(M, N, factor=8):
    s, A, W, B = make_conv1d_cpu_scheduler_v2(M, N, factor, False)

    k_axis = B.op.reduce_axis[0]
    k_outer, k_inner = s[B].split(k_axis, factor=factor)
    s[B].unroll(k_inner)  # 展开以减少循环开销

    return s, A, W, B

IR:

@T.prim_func
def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
    T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
    for n_outer in T.parallel(528):
        for n_inner_s in range(8):
            if T.likely(n_outer * 8 + n_inner_s < 4223):
                B[n_outer * 8 + n_inner_s] = T.float32(0)
        # 将k拆分为512 * 8
        for k_outer in range(512):
            for n_inner_s in range(8):
                if T.likely(n_outer * 8 + n_inner_s < 4223): 
                    cse_var_3: T.int32 = k_outer * 8    # 输入位置
                    cse_var_2: T.int32 = n_outer * 8 + n_inner_s  # 输出位置
                    cse_var_1: T.int32 = cse_var_2 - cse_var_3  # 权重位置
                    # 如果 n_outer - k_outer < 0 或 128 <= cse_var_1: += 0
                    # 否则: += A[cse_var_3] * W[cse_var_1]
                    B[cse_var_2] = B[cse_var_2] + T.if_then_else(n_outer - k_outer < 0 or 128 <= cse_var_1, T.float32(0), A[cse_var_3] * W[cse_var_1])
            for n_inner_s in range(8):
                if T.likely(n_outer * 8 + n_inner_s < 4223):
                    cse_var_6: T.int32 = k_outer * 8
                    cse_var_5: T.int32 = n_outer * 8 + n_inner_s
                    cse_var_4: T.int32 = cse_var_5 - cse_var_6
                    B[cse_var_5] = B[cse_var_5] + T.if_then_else(n_outer - k_outer < 0 or 129 <= cse_var_4, T.float32(0), A[cse_var_6 + 1] * W[cse_var_4 - 1])
            # ... 相同的6个展开块

这里我们在k拆分后进行了循环展开for k_outer in range(512):然后对k_inner进行展开。这使编译器可以进行更好的优化。

我们重写卷积以在[0, N)范围内求和,检查(n-k)在A中是否有效。然后我们结合并行化、拆分和向量化。这大大减少了边界开销,实现了我们最快的手动调度(0.5661 ms)。

代码:

# 优化v4: 计算重构(最小化if-else) + 并行 + 拆分 + 向量化
def make_conv1d_cpu_scheduler_v4(M, N, factor=8):
    A = te.placeholder((M,), name="A", dtype="float32")
    W = te.placeholder((N,), name="W", dtype="float32")
    k = te.reduce_axis((0, N), name="k")

    B = te.compute(
        (M + N - 1,),
        lambda n: te.sum(
            tvm.tir.if_then_else(
                tvm.tir.all(n - k >= 0, n - k < M),
                A[n - k] * W[k],
                tvm.tir.const(0.0, "float32")
            ),
            axis=k
        ),
        name="B"
    )
    s = te.create_schedule(B.op)
    n_axis = B.op.axis[0]
    outer, inner = s[B].split(n_axis, factor=factor)
    s[B].parallel(outer)
    s[B].vectorize(inner)   # CPU SIMD使用

    return s, A, W, B

IR:

@T.prim_func
def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
    T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
    for n_outer in T.parallel(528):
        for n_inner_s in range(8):
            if T.likely(n_outer * 8 + n_inner_s < 4223):
                B[n_outer * 8 + n_inner_s] = T.float32(0)
        # k只循环[0, 128]
        for k, n_inner_s in T.grid(128, 8):
            if T.likely(n_outer * 8 + n_inner_s < 4223):
                cse_var_2: T.int32 = n_outer * 8 + n_inner_s  # 输出位置
                cse_var_1: T.int32 = cse_var_2 - k    # 输入位置
                # 如果 0 <= cse_var_1 且 cse_var_1 < 4096: += A[cse_var_1] * W[k]
                # 否则: += 0
                B[cse_var_2] = B[cse_var_2] + T.if_then_else(0 <= cse_var_1 and cse_var_1 < 4096, A[cse_var_1] * W[k], T.float32(0))

最初,我们通过B[n] = Σ(k=0→4095) A[k] * W[n-k]计算。现在我们改为B[n] = Σ(k=0→127) A[n-k] * W[k]

if语句也优化为if 0 <= cse_var_1 and cse_var_1 < 4096,所有这些结合起来带来了巨大的性能提升。

在 v5 中添加两条 stage pragma,强制 TVM 将八通道内层循环展开为直线代码。

def make_conv1d_cpu_scheduler_v5(M, N, factor=8):
    A = te.placeholder((M,), name="A", dtype="float32")
    W = te.placeholder((N,), name="W", dtype="float32")
    k = te.reduce_axis((0, N), name="k")
    B = te.compute(
        (M + N - 1,),
        lambda n: te.sum(
            tvm.tir.if_then_else(
                tvm.tir.all(n - k >= 0, n - k < M),
                A[n - k] * W[k],
                tvm.tir.const(0.0, "float32")
            ),
            axis=k
        ),
        name="B"
    )

    s = te.create_schedule(B.op)
    n_axis = B.op.axis[0]

    outer, inner = s[B].split(n_axis, factor=factor)
    s[B].parallel(outer)
    s[B].vectorize(inner)
    k_axis = B.op.reduce_axis[0]
    s[B].pragma(outer, "auto_unroll_max_step", 16)
    s[B].pragma(outer, "unroll_explicit", True)

    return s, A, W, B

IR:

def main(A: T.Buffer((4096,), "float32"), W: T.Buffer((128,), "float32"), B: T.Buffer((4223,), "float32")):
    T.func_attr({"from_legacy_te_schedule": T.bool(True), "tir.noalias": T.bool(True)})
    for n_outer in T.parallel(528):
        cse_var_1: T.int32 = n_outer * 8
        B[cse_var_1] = T.float32(0)
        B[cse_var_1 + 1] = T.float32(0)
        B[cse_var_1 + 2] = T.float32(0)
        B[cse_var_1 + 3] = T.float32(0)
        B[cse_var_1 + 4] = T.float32(0)
        B[cse_var_1 + 5] = T.float32(0)
        B[cse_var_1 + 6] = T.float32(0)
        if T.likely(n_outer < 527):
            B[cse_var_1 + 7] = T.float32(0)
        for k in range(128):
            cse_var_8: T.int32 = cse_var_1 - k
            cse_var_7: T.int32 = cse_var_1 + 6
            cse_var_6: T.int32 = cse_var_1 + 5
            cse_var_5: T.int32 = cse_var_1 + 4
            cse_var_4: T.int32 = cse_var_1 + 3
            cse_var_3: T.int32 = cse_var_1 + 2
            cse_var_2: T.int32 = cse_var_1 + 1
            B[cse_var_1] = B[cse_var_1] + T.if_then_else(0 <= cse_var_8 and cse_var_8 < 4096, A[cse_var_8] * W[k], T.float32(0))
            B[cse_var_2] = B[cse_var_2] + T.if_then_else(-1 <= cse_var_8 and cse_var_8 < 4095, A[cse_var_2 - k] * W[k], T.float32(0))
            B[cse_var_3] = B[cse_var_3] + T.if_then_else(-2 <= cse_var_8 and cse_var_8 < 4094, A[cse_var_3 - k] * W[k], T.float32(0))
            # ... same for 4 5 6 7
            if T.likely(n_outer < 527):
                cse_var_9: T.int32 = cse_var_1 + 7
                B[cse_var_9] = B[cse_var_9] + T.if_then_else(-7 <= cse_var_8 and cse_var_8 < 4089, A[cse_var_9 - k] * W[k], T.float32(0))

更大的直线代码块仍可容纳于 L1-I 缓存中,因此省去的循环簿记和分支误预测成本弥补了代码体积的增长——同样的 4096×128 卷积从 0.566 ms 下降到 0.411 ms

我们定义了一个参数化搜索空间(i和k的拆分、向量化开关、展开因子等),并让AutoTVM运行。它尝试不同的配置,测量它们,并选择最佳配置。

@autotvm.template("conv1d_auto_tune")
def conv1d_auto_tune(M, N):
    ...
    # 使用AutoTVM配置(cfg)定义和应用拆分、向量化、展开
    ...

最佳结果:0.70 ms

所有时间都是在x86 CPU(“llvm"目标)上以毫秒为单位。关键版本:

实现时间 (ms)相对朴素的加速比
朴素~24.351.0×
v0 (较小的k)~23.051.06×
v1 (并行)~22.921.06×
v2 (+向量化)~16.041.52×
v3 (+展开)~14.601.67×
v4 (重构)~0.5743.03×
v5 (Combined)~0.4159.39×
AutoTVM (50次尝试)~0.7034.79×
NumPy~0.21~116× (vs. 朴素)
  1. 朴素 vs. v5: 我们看到通过逐步减少开销并利用并行性和SIMD,时间从24.35 ms大幅下降到0.41 ms。
  2. AutoTVM: 自动调优收敛到0.70 ms—在这种配置下略慢于我们最好的手动调度,但仍然比朴素实现好得多。
  3. NumPy参考: NumPy为~0.21 ms,表明高度优化的库(如MKL)仍然可以更快。
  • 笔记本(本博客使用的所有代码):链接
  • TVM论文摘要:链接