parallel101 / simdtutor

x86-64 SIMD矢量优化系列教程
109 stars 9 forks source link

如何处理图像 uint8 格式的并行 #1

Closed chenxinfeng4 closed 1 year ago

chenxinfeng4 commented 1 year ago

UP主你好,我是在从事 CV 和生物交叉的研究生,做性能优化时关注到你的B站教程,非常受用。(还有就是莫顿序列,最近也准备搞起)。视频我都看完了,这次想请教SIMD。SIMD 是针对浮点运算,但是对图像处理不太友好。图像是(R、G、B) 排布的,例如Opencv定义的 为HxWx3 的numpy 矩阵或C++数组。别扭的点在于:

问题1: 请问对于这种图像 RGB24 数据,行内是怎样做加速?

问题2:代码请指导

我有个操作需要在原始图片上进行mask抠图。背景的区域替换为0。如下面 python 代码,但是性能特别差。所以用 SIMD 优化了一下,这个代码还很粗糙,只适配了图像的一个通道。还没与numpy对比过性能差异。

img[mask==0] = 0   #python numpy,方法1, 性能特别差

img = img * mask[..., None] #python numpy,方法2, 性能也差

用 c++ 的for循环做这件事情,收益也很小。所以在尝试 SIMD 。

//g++ -mavx2 fast_simd.cpp
#include <iostream>
#include <immintrin.h>

void vectorAnd(uint8_t* a, uint8_t* b, uint8_t* result, int size) {
    int width = 256/8; // =32
    uint8_t (* vectorA_)[width] = reinterpret_cast<uint8_t(*)[width]>(a);
    uint8_t (* vectorB_)[width] = reinterpret_cast<uint8_t(*)[width]>(b);
    uint8_t (* vectorResult_)[width] = reinterpret_cast<uint8_t(*)[width]>(result);

    // 每次处理 8 个元素, 
    int batch = size * 8 / 256;
    __m256i one = _mm256_set1_epi8(1);
    __m256i xor_mask = _mm256_set1_epi8(0xff);
    for (int i = 0; i < batch; i++) {
        // 使用 AVX2 指令进行矢量操作。因为没有 int8 乘法,只能用多步骤的位操作符代替
        __m256i vec_a = _mm256_loadu_si256((__m256i_u*) vectorA_[i]);
        __m256i vec_b = _mm256_loadu_si256((__m256i_u*) vectorB_[i]);
        __m256i b1 = _mm256_sub_epi8(vec_b, one);
        __m256i b2 = _mm256_xor_si256(b1, xor_mask); //vec_b '0'-> b2 0; vec_b '1' -> b2 0xFF
        __m256i resultVector = _mm256_and_si256(vec_a, b2);    
        // 将结果存储到结果数组中
        _mm256_storeu_si256((__m256i_u*) vectorResult_[i], resultVector);
    }

    // 处理剩余的不足 ? 个元素的情况
    int remainingElements = size * 8 % 256;
    for (int i = 0; i < remainingElements; i++) {
        // result[numVectors * 8 + i] = a[numVectors * 8 + i] + b[numVectors * 8 + i];
    }
}

int main(){
    const int size = 64;
    uint8_t img[size] = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
                       17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32,
                       33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48,
                       49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64}; 
    uint8_t mask[size] = {1, 0, 1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 1, 0, 0,
                       1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0,
                       1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0,
                       1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0};

    uint8_t result[size] = {0};
    std::cout << "pre" << std::endl;
    for (int i = 0; i < size; i++) {
        std::cout << (int)result[i] << " ";
    }
    std::cout << std::endl;
    std::cout << "\npost" << std::endl;
    vectorAnd(img, mask, result, size);
    for (int i = 0; i < size; i++) {
        std::cout << (int)result[i] << " ";
    }
}
chenxinfeng4 commented 1 year ago

小彭老师,我发现一个问题。c++ 的 SIMD 的矢量运算,没有python numpy快。 我把上面 SIMD 指令的代码(矢量矢量逐元素乘法)用 cython 封装导入python,性能测试居然不及 numpy 的 img = img * mask[..., None]。那么更进一步的问题

问题3:数值计算性能而言,底层 SIMD 难道不及高级封装的 MKL/BLAS/NumPy 。

archibate commented 1 year ago

给我看完整代码

无法顺畅的大口呼吸,是活着的最好证明

---原始邮件--- 发件人: @.> 发送时间: 2023年8月1日(周二) 晚上11:31 收件人: @.>; 抄送: @.***>; 主题: Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1)

小彭老师,我发现一个问题。c++ 的 SIMD 的矢量运算,没有python numpy快。 我把上面 SIMD 指令的代码(矢量矢量逐元素乘法)用 cython 封装导入python,性能测试居然不及 numpy 的 img = img * mask[..., None]。那么更进一步的问题

问题3:数值计算性能而言,底层 SIMD 难道不及高级封装的 MKL/BLAS/NumPy 。

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you are subscribed to this thread.Message ID: @.***>

archibate commented 1 year ago

用 cython 封装导入python

你是如何通过cython把数据从py传入cpp的,不会是通过list吧?

g++ -mavx2 fast_simd.cpp

没有 -O3?

int width = 256/8;

const int

for (int i = 0; i < remainingElements; i++)

for (int i = batch 256 / 8; i < batch 256 / 8 + remainingElements; i++)

底层 SIMD 难道不及高级封装的 MKL/BLAS/NumPy

热知识:MKL/BLAS/NumPy 底层也是会调用 SIMD 的,而且调用得比你随手写的专业,理论上无法通过手写超越他们。

请问对于这种图像 RGB24 数据,行内是怎样做加速?

x86 的 SIMD 确实对 AOS 数据不太友好,特别是 3 个元素一组的。

一般来说要用上 SIMD 优化,不仅代码需要修改,数据结构也需要修改,否则难产生提升。

解决方法:

  1. 数据结构保持不变,用加载 4 个的 _mm_loadu_ps 去加载 3 个,最后一个 float 浪费掉。
  2. 采用 RGBA 格式,Alpha 通道可以不用,但就放在那里占位置。
  3. 仍然只储存 RGB,但是图像存储时要按照 SOA 格式来存。

AOS(你目前的):vector<array<uint8, 3>> rgbs;

SOA(适合优化的):vector reds, greens, blues;

AOS(你目前的):RGB RGB RGB RGB

SOA(适合优化的):RRRR GGGG BBBB

以 float 的 RGB 图像计算灰度为例,AOS 要比 SOA 多一步矩阵转置,浪费时钟周期,且对于 uint8 则转置还会更复杂。

AOS(你目前的)的代码:

auto r = _mm_loadu_ps(rgbs + i*3);  // 不对齐加载,加载出来只用到前三位RGB,最后一位R'浪费掉了
auto g = _mm_loadu_ps(rgbs + i*3 + 3);  // 不对齐加载,加载出来只用到前三位R'G'B',最后一位R''浪费掉了
auto b = _mm_loadu_ps(rgbs + i*3 + 6);  // 不对齐加载,加载出来只用到前三位R''G''B'',最后一位R'''浪费掉了
auto a = _mm_setzero_ps();  // 用不到的 Alpha 通道,但为伺候 4x4 转置的帮手宏不得不弄个寄存器
_MM_TRANSPOSE4X4_PS(r, g, b, a); // xmmintrin.h里的一个帮手宏,用unpackhi和unpacklo自己实现也很容易
_mm_storeu_ps(grayscales + i, r + g + b);  // 不对齐写入,最后一位浪费,注意留空边界避免segfault(bound需要-4)
i += 3;

SOA(适合优化的)的代码:

auto r = _mm_load_ps(reds + i);
auto g = _mm_load_ps(greens + i);
auto b = _mm_load_ps(blues + i);
_mm_store_ps(grayscales + i, r + g + b);
i += 24;

采用了 SOA 后的 SIMD 代码简单了许多,每个循环的延迟低了不少,这样完了以后还可以做一些 unrolling 就算是基本逼近性能上限了。

另外现在 AVX512 支持了 mask_load 允许任意宽度(例如本来只能是 4xfloat 现在能 3xfloat)了,此外 arm 的 NEON 也有自带 3x4 矩阵转置的指令,相比 x86 对图像处理更加友好,arm 的 SVE 也可以通过设 mask 实现任意非 2 的幂宽度的 SIMD。

对了,numpy 的数据排布对用户不透明,里面搞不好就是已经运用了 SOA 甚至是 AOSOA 优化过的 data layout。

chenxinfeng4 commented 1 year ago

哪能通过 list 传递给 c++,我是在cython 获取ndarray 的数组指针,再传递给出c++。我现在在旅途中,晚些发详细代码。通过什么方式呢?我是否可以创一个github 仓库,把链接和安装方式附上?

昨天试过用 avx512 替换 avx256 指令集,但没任何性能提升。这个两个实现都会一并附上。

---- 回复的原邮件 ---- | 发件人 | @.> | | 日期 | 2023年08月02日 19:22 | | 收件人 | @.> | | 抄送至 | @.>@.> | | 主题 | Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1) |

你是如何通过cython把数据从py传入cpp的,不会是通过list吧?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

archibate commented 1 year ago

有没有试过写不用simd的for循环版本作为对照,avx512在一些老cpu上是拆成两个avx256来执行的。

无法顺畅的大口呼吸,是活着的最好证明

---原始邮件--- 发件人: @.> 发送时间: 2023年8月2日(周三) 晚上8:38 收件人: @.>; 抄送: @.**@.>; 主题: Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1)

哪能通过 list 传递给 c++,我是在cython 获取ndarray 的数组指针,再传递给出c++。我现在在旅途中,晚些发详细代码。通过什么方式呢?我是否可以创一个github 仓库,把链接和安装方式附上?

昨天试过用 avx512 替换 avx256 指令集,但没任何性能提升。这个两个实现都会一并附上。

---- 回复的原邮件 ---- | 发件人 | @.> | | 日期 | 2023年08月02日 19:22 | | 收件人 | @.> | | 抄送至 | @.>@.> | | 主题 | Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1) |

你是如何通过cython把数据从py传入cpp的,不会是通过list吧?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.> — Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: @.>

chenxinfeng4 commented 1 year ago

完整代码在 https://github.com/chenxinfeng4/parallel_multiple_uint8

chenxinfeng4 commented 1 year ago

在 Linux + Intel(R) Xeon(R) Silver 4314 CPU @ 2.40GHz的服务器上测得。

SIMD 略低于Numpy。

Numpy: 100%|██████████| 5000/5000 [00:14<00:00, 352.82it/s] C++ for loop: 100%|██████████| 5000/5000 [00:14<00:00, 349.51it/s] C++ AVX2: 100%|██████████| 5000/5000 [00:15<00:00, 329.48it/s] C++ AVX512: 100%|██████████| 5000/5000 [00:15<00:00, 327.07it/s]

但在 Windows + AMD + VC上,SIMD 快于 Numpy。(AVX512 编译失败)

Numpy: 100%|██████████| 1000/1000 [00:18<00:00, 52.85it/s] C++ for loop: 100%|██████████| 1000/1000 [00:16<00:00, 61.62it/s] C++ AVX2: 100%|██████████| 1000/1000 [00:09<00:00, 100.58it/s]

chenxinfeng4 commented 1 year ago

在 Linux + Intel(R) Xeon(R) Silver 4314 CPU @ 2.40GHz的服务器上测得。 SIMD 有多线程收益,threads(3) 可以获得 2.1x 速度提升。但是 AVX512 很垃。

C++ for loop: 100%|██████████| 10000/10000 [00:13<00:00, 747.35it/s] C++ AVX2: 100%|██████████| 10000/10000 [00:13<00:00, 753.44it/s] C++ AVX512: 100%|██████████| 10000/10000 [00:16<00:00, 600.03it/s]

archibate commented 1 year ago

> RGB24 的数据处理过程总, HWC 的格式处理特别慢。是否有合适的 SIMD 加速?

请看上面第一个回答中的代码,我已经编辑过内容,x86上HWC格式没有合理的加速方案,SIMD喜欢对齐,这就是一个不对齐劣质的格式,如果一定要用此种格式,建议加上A分量,用RGBA32配合4x4矩阵转置,另外你这种情况我在【【公开课】编译器优化与SIMD指令集(#4)-哔哩哔哩】 https://b23.tv/9iZ2JD2中已经介绍过,唯一最好的方法是换CHW

无法顺畅的大口呼吸,是活着的最好证明

---原始邮件--- 发件人: @.> 发送时间: 2023年8月3日(周四) 凌晨0:08 收件人: @.>; 抄送: @.**@.>; 主题: Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1)

完整代码在 https://github.com/chenxinfeng4/parallel_multiple_uint8

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you commented.Message ID: @.***>

chenxinfeng4 commented 1 year ago

感谢在RGB24 的问题上给出的明确答复。我今后将换轨到 CHW 格式处理数据,绕开对齐的问题。

其次,可以看到我上面做的性能测试。问题在于 1. numpy 自带的矢量化工具,甚至 c++ 编译器优化for 循环就足够好,仿佛不需要SIMD。2. 已经使用SIMD的情况下,再加入多线程对高性能计算的增益有多大,瓶颈在哪里?

archibate commented 1 year ago

1

c++ 编译器优化for循环,会生成使用 SIMD 指令集的代码,编译器优化里有一个pass叫做矢量化(vectorization),这个就是把标量的for循环自动转换为SIMD的。你说的“仿佛不需要SIMD”可能指的是“不需要手动SIMD”,对于mask这种简单的操作编译器确实可以优化到理论上限,但是有一些复杂的操作,例如我视频中演示的数组过滤(filter)和矢量化sin函数(fillsin),这时的情况对编译器可能是陌生的,他不知道怎么自动优化,那就需要手动SIMD优化。

2

一般来说,数据量大于 L2 cache 时,可以考虑并行优化,否则管理线程池的开销无法弥补并行带来的提升,反而不划算。通常并行以后最大的瓶颈已经不是计算,而是内存带宽。对于顺序访问且embrassive-parallel的(例如每个像素点逐个求灰度,或是你的mask操作)而言就只是内存带宽,没法再提升了。对于顺序访问但不embrassive-parallel的(例如reduce、scan、filter、sort)则可能需要结合《tbb》一课中的知识进行并行化。对于不是顺序访问,例如存在gather、scatter、transpose的情况(例如高斯模糊,中值滤波,矩阵转置),则需要缓存优化,包括莫顿码和循环分块。并行优化和SIMD优化是可以结合的,但并行后内存往往成为主要瓶颈,比考虑SIMD化优先级更高,甚至在并行以后CHW和HWC都是差不多的。例如如果给HWC加上A通道,要分两种情况讨论:数据量小于L2时会产生提升,因为SIMD喜欢对齐;但大于L2时反而会降低性能,因为对齐会导致访问不连续。

archibate commented 1 year ago

我在Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz上的测试结果:

Numpy: 100%|█████| 5000/5000 [00:12<00:00, 415.87it/s]
C++ for loop: 100%|█| 5000/5000 [00:11<00:00, 431.66it/s]
C++ AVX2: 100%|██| 5000/5000 [00:11<00:00, 431.93it/s]
C++ AVX512: 100%|█| 5000/5000 [00:11<00:00, 438.62it/s] // 我的CPU没有avx512,这个没法编译,所以这行数据其实是multiply_cpp_avx2的

对了,MSVC的自动矢量化很烂,wendous上C++forloop那个估计没有SIMD成功。 我猜测numpy可能背地里就是写的forloop,然后坐等gcc优化的,但是numpy在wendous上遇到了MSVC,MSVC不会优化,所以numpy在wendous上和cppforloop一样慢。 所以,在wendous平台上手动SIMD就更加必要了(和gcc/clang等擅长自动矢量化的专业编译器相比)。

archibate commented 1 year ago
        __m256i b1 = _mm256_sub_epi8(vec_mask, one);
        __m256i b2 = _mm256_xor_si256(b1, xor_mask);
        __m256i resultVector = _mm256_and_si256(vec_img, b2); 

看你这个xor是为了翻转全部bit?这里可以优化为:

        __m256i b1 = _mm256_sub_epi8(vec_mask, one);
        __m256i resultVector = _mm256_andnot_si256(b1, vec_img); 

其中andnot(x, y) = and(not(x), y)

archibate commented 1 year ago

另外,你的代码仓库中似乎只有nhw * mask,没有nhw * mask[None, ...]hwn * mask[..., None],能否也提供一下后两者的手动SIMD代码案例?我会尝试优化得和numpy一样快。

archibate commented 1 year ago

我自己做了 img_NHW * mask_HW[None, ...] 的实验:

static void multiply_cpp_avx2(char* img_NHW_ptr, bool* mask_HW_ptr, 
                  char* output_NHW_ptr, int size){
    __m256i one = _mm256_set1_epi8(1);
    __m256i xor_mask = _mm256_set1_epi8(static_cast<char>(0xff));

    const int width = 256 / 8; // =32 uint8 per batch
    size_t batch = size / width;

    uint8_t * vectorIMG = reinterpret_cast<uint8_t*>(img_NHW_ptr);
    uint8_t * vectorMASK = reinterpret_cast<uint8_t*>(mask_HW_ptr);
    uint8_t * vectorResult = reinterpret_cast<uint8_t*>(output_NHW_ptr);
    uint8_t * vectorResult_end = vectorResult + batch * width;

    while (vectorResult != vectorResult_end) {
        __m256i vec_img_r = _mm256_loadu_si256((__m256i*) vectorIMG);
        __m256i vec_img_g = _mm256_loadu_si256((__m256i*) (vectorIMG + size));
        __m256i vec_img_b  = _mm256_loadu_si256((__m256i*) (vectorIMG + size * 2));
        __m256i vec_mask = _mm256_loadu_si256((__m256i*) vectorMASK);
        __m256i b1 = _mm256_sub_epi8(vec_mask, one);
        __m256i result_r = _mm256_andnot_si256(b1, vec_img_r);
        __m256i result_g = _mm256_andnot_si256(b1, vec_img_g);
        __m256i result_b = _mm256_andnot_si256(b1, vec_img_b);
        _mm256_storeu_si256((__m256i*) vectorResult, result_r);
        _mm256_storeu_si256((__m256i*) (vectorResult + size), result_g);
        _mm256_storeu_si256((__m256i*) (vectorResult + size * 2), result_b);
        vectorIMG += width;
        vectorMASK += width;
        vectorResult += width;
    }
    // 暂时不考虑边角料,让数据恰好填充 batch
}
static void multiply_cpp_forloop(char* img_NHW_ptr, bool* mask_HW_ptr, 
                  char* output_NHW_ptr, int size){
    uint8_t * uptrIMG = reinterpret_cast<uint8_t*>(img_NHW_ptr);
    uint8_t * uptrMASK = reinterpret_cast<uint8_t*>(mask_HW_ptr);
    uint8_t * uptrResult = reinterpret_cast<uint8_t*>(output_NHW_ptr);
    for(int i=0; i<size; ++i){
        uptrResult[i] = uptrIMG[i] * uptrMASK[i];
        uptrResult[i+size] = uptrIMG[i+size] * uptrMASK[i];
        uptrResult[i+size*2] = uptrIMG[i+size*2] * uptrMASK[i];
    }
}

结果:

Numpy: 100%|████| 5000/5000 [00:00<00:00, 5171.27it/s]
C++ for loop: 100%|█| 5000/5000 [00:05<00:00, 917.09it/s]
C++ AVX2: 100%|█| 5000/5000 [00:00<00:00, 5459.61it/s]

可以看到这下手动SIMD取得了比forloop更好的结果。

chenxinfeng4 commented 1 year ago

等我晚上整理整理,然后同步github 仓库。

有类似top 的指令监测内存的瓶颈吗?生产环境中,任务单开的时候速度很快,xargs 并行的时候就降速了,查看 cpu 还有很多空闲的核心。我一直怀疑是内存带宽瓶颈,没法验证。

---- 回复的原邮件 ---- | 发件人 | @.> | | 日期 | 2023年08月03日 16:01 | | 收件人 | @.> | | 抄送至 | @.>@.> | | 主题 | Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1) |

另外,你的代码仓库中似乎只有nhw mask,没有nhw mask[..., None] 和 hwn * mask[..., None],能否也提供一下后两者的手动SIMD代码案例?我会尝试优化得和numpy一样快。

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

archibate commented 1 year ago

内存带宽验证要么用vtune,要么根据CPU flops数据计算理论上限it/s。不过一般来说uint8计算不太会卡在内存带宽,而且你这个数据量才1MB,才刚刚超过L2,L3还能hold住的情况下,不会出现需要访问主内存的情况。

archibate commented 1 year ago

HWC的情况我想了一下,其实是可以SIMD化的,用一个shuffle就行,但也仅仅是mask,对于不是mask这一特定操作的情况还需要额外想一遍,我试试看效果如何。

        // REGISTER    01234567890123456789012345678901 01234567890123456789012345678901 01234567890123456789012345678901
        // img       = rgbrgbrgbrgbrgbrgbrgbrgbrgbrgbrg brgbrgbrgbrgbrgbrgbrgbrgbrgbrgbr gbrgbrgbrgbrgbrgbrgbrgbrgbrgbrgb
        // maskorig  = 01234567890ABCDEFGHIJKLMNOPQRSTU
        // shufmask1 = 00011122233344455566677788899900
        // shufmask2 = 0AAABBBCCCDDDEEEFFFGGGHHHIIIJJJK
        // shufmask3 = KKLLLMMMNNNOOOPPPQQQRRRSSSTTTUUU
        // 则 img_1 & shufmask1 就能起到和分别对 HWC 格式的前三分之一 rgb 进行 mask 一样的效果,依次类推
chenxinfeng4 commented 1 year ago

我生产环境的代码并不全是上面的代码,还包括图像处理和视频读写。计算带宽理论占用比较难。

---- 回复的原邮件 ---- | 发件人 | @.> | | 日期 | 2023年08月03日 16:52 | | 收件人 | @.> | | 抄送至 | @.>@.> | | 主题 | Re: [parallel101/simdtutor] 如何处理图像 uint8 格式的并行 (Issue #1) |

内存带宽验证要么用vtune,要么根据CPU flops数据计算理论上限it/s。不过一般来说uint8不太会内存带宽,而且你这个数据量才1MB,才刚刚超过L2,L3还能hold住的情况下,不会出现需要访问主内存的情况。

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

archibate commented 1 year ago

优化了一下HWC:

static void multiply_cpp_avx2_full(char* img_HWN_ptr, bool* mask_HW_ptr, 
                  char* output_HWN_ptr, int size){
    __m256i one = _mm256_set1_epi8(1);
    __m256i xor_mask = _mm256_set1_epi8(static_cast<char>(0xff));

    const int width = 256 / 8; // =32 uint8 per batch
    size_t batch = size / width;

    uint8_t * vectorIMG = reinterpret_cast<uint8_t*>(img_HWN_ptr);
    uint8_t * vectorMASK = reinterpret_cast<uint8_t*>(mask_HW_ptr);
    uint8_t * vectorResult = reinterpret_cast<uint8_t*>(output_HWN_ptr);
    uint8_t * vectorResult_end = vectorResult + batch * width * 3;
    /* __m256i shuf1 = _mm256_setr_epi8(0 ,0 ,0 ,1 ,1 ,1 ,2 ,2 ,2 ,3 ,3 ,3 ,4 ,4 ,4 ,5 ,  // 0001112223334445 */
    /*                                  16,16,16,17,17,17,18,18,18,19,19,19,20,20,20,21); // GGGHHHIIIJJJKKKL */
    /* __m256i shuf2 = _mm256_setr_epi8(5 ,5 ,6 ,6 ,6 ,7 ,7 ,7 ,8 ,8 ,8 ,9 ,9 ,9 ,10,10,  // 55666777888999AA */
    /*                                  26,27,27,27,28,28,28,29,29,29,30,30,30,31,31,31); // QRRRSSSTTTUUUVVV */
    /* __m256i shuf3 = _mm256_setr_epi8(10,11,11,11,12,12,12,13,13,13,14,14,14,15,15,15,  // ABBBCCCDDDEEEFFF */
    /*                                  21,21,22,22,22,23,23,23,24,24,24,25,25,25,26,26); // LLMMMNNNOOOPPPQQ */
    __m256i shuf12 = _mm256_setr_epi8(0 ,0 ,0 ,1 ,1 ,1 ,2 ,2 ,2 ,3 ,3 ,3 ,4 ,4 ,4 ,5 ,
                                      5 ,5 ,6 ,6 ,6 ,7 ,7 ,7 ,8 ,8 ,8 ,9 ,9 ,9 ,10,10);
    __m256i shuf23 = _mm256_setr_epi8(5 ,5 ,6 ,6 ,6 ,7 ,7 ,7 ,8 ,8 ,8 ,9 ,9 ,9 ,10,10,
                                      10,11,11,11,12,12,12,13,13,13,14,14,14,15,15,15);
    __m256i shuf31 = _mm256_setr_epi8(10,11,11,11,12,12,12,13,13,13,14,14,14,15,15,15,
                                      0 ,0 ,0 ,1 ,1 ,1 ,2 ,2 ,2 ,3 ,3 ,3 ,4 ,4 ,4 ,5 );
    // REGISTER    01234567890123456789012345678901 01234567890123456789012345678901 01234567890123456789012345678901
    // img       = rgbrgbrgbrgbrgbrgbrgbrgbrgbrgbrg brgbrgbrgbrgbrgbrgbrgbrgbrgbrgbr gbrgbrgbrgbrgbrgbrgbrgbrgbrgbrgb
    // maskorig  = 0123456789ABCDEFGHIJKLMNOPQRSTUV
    // shufmask1 = 000111222333444555666777888999AA
    // shufmask2 = ABBBCCCDDDEEEFFFGGGHHHIIIJJJKKKL
    // shufmask3 = LLMMMNNNOOOPPPQQQRRRSSSTTTUUUVVV
    // 则 img_1 & shufmask1 就能起到和分别对 HWC 格式的前三分之一 rgb 进行 mask 一样的效果,依次类推
    while (vectorResult != vectorResult_end) {
        __m256i vec_mask = _mm256_loadu_si256((__m256i*) vectorMASK);
        vectorMASK += width;
        __m256i maskorig = _mm256_sub_epi8(vec_mask, one);
        __m256i shufmask2 = _mm256_shuffle_epi8(maskorig, shuf31);

        __m256i vec_img_1 = _mm256_loadu_si256((__m256i*) vectorIMG);
        vectorIMG += width;
        __m256i maskorig_ll = _mm256_broadcastsi128_si256(_mm256_castsi256_si128(maskorig));
        __m256i shufmask1 = _mm256_shuffle_epi8(maskorig_ll, shuf12);
        __m256i result_1 = _mm256_andnot_si256(shufmask1, vec_img_1);
        _mm256_storeu_si256((__m256i*) vectorResult, result_1);
        vectorResult += width;

        __m256i maskorig_hh = _mm256_broadcastsi128_si256(_mm256_extracti128_si256(maskorig, 1));
        __m256i vec_img_2 = _mm256_loadu_si256((__m256i*) vectorIMG);
        vectorIMG += width;
        __m256i result_2 = _mm256_andnot_si256(shufmask2, vec_img_2);
        _mm256_storeu_si256((__m256i*) vectorResult, result_2);
        vectorResult += width;

        __m256i shufmask3 = _mm256_shuffle_epi8(maskorig_hh, shuf23);
        __m256i vec_img_3  = _mm256_loadu_si256((__m256i*) vectorIMG);
        vectorIMG += width;
        __m256i result_3 = _mm256_andnot_si256(shufmask3, vec_img_3);
        _mm256_storeu_si256((__m256i*) vectorResult, result_3);
        vectorResult += width;
    }
    // 暂时不考虑边角料,让数据恰好填充 batch
}

结果如下:

Numpy: 100%|███████| 500/500 [00:02<00:00, 238.25it/s]
C++ for loop: 100%|█| 500/500 [00:00<00:00, 863.75it/s
C++ AVX2: 100%|█| 5000/5000 [00:00<00:00, 7441.06it/s]
C++ AVX2 full: 100%|█| 5000/5000 [00:00<00:00, 7316.90it/s]
archibate commented 1 year ago

添加了并行版本的测试案例:

Numpy: 100%|█████████████████████████████████████████████████| 500/500 [00:02<00:00, 233.74it/s]
C++ for loop: 100%|██████████████████████████████████████████| 500/500 [00:00<00:00, 841.51it/s]
C++ AVX2: 100%|███████████████████████████████████████████| 5000/5000 [00:00<00:00, 7465.66it/s]
C++ AVX2 full: 100%|██████████████████████████████████████| 5000/5000 [00:00<00:00, 7301.23it/s]
C++ for loop (OpenMP): 100%|████████████████████████████████| 500/500 [00:00<00:00, 2168.63it/s]
C++ AVX2 (OpenMP): 100%|█████████████████████████████████| 5000/5000 [00:00<00:00, 12758.66it/s]
C++ AVX2 full (OpenMP): 100%|████████████████████████████| 5000/5000 [00:00<00:00, 14928.05it/s]
    if (!parallel) {
        multiply_by(img_HWN_ptr, mask_HW_ptr, 
                    output_ptr, size);
    } else {
        const int chunk = 65536;
        #pragma omp parallel for
        for (int i = 0; i < size; i += chunk) {
            multiply_by(img_HWN_ptr + i*3, mask_HW_ptr + i,
                        output_ptr + i*3, std::min(chunk, size - i));
        }
    }

PS:我具有12个逻辑核,6个物理核。

chenxinfeng4 commented 1 year ago

看起来速度得到了不可思议的提升 (200 -> 7K 单核 -> 12K 多核) 。我一时间看不太懂,让我消化消化。大概是取了一个公约数,把 IMG 按 RGBRGBRGBRGB... 排布占满3个寄存器。把 MASK 广播到同样的3个寄存器尺寸。然后再做矢量计算。这种算法也是相当变态了。

archibate commented 1 year ago

对的,把mask通过shuffle扩展为三个mask,运用这种方法最后我们的HWC版本反而比CHW版本还要快了,你如果还有什么HWC格式的算法需要优化,欢迎发给小彭老师来研究研究。

chenxinfeng4 commented 1 year ago

谢谢小彭老师细心解答