I have a function that copies binary data from one area to another, but only if the bytes are different from a specific value. Here is a code sample:
void copy_if(char* src, char* dest, size_t size, char ignore)
{
for (size_t i = 0; i < size; ++i)
{
if (src[i] != ignore)
dest[i] = src[i];
}
}
The problem is that this is too slow for my current need. Is there a way to obtain the same result in a faster way?
Update: Based on answers I tried two new implementations:
void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{
for (size_t i = 0; i < size; ++i)
{
char temps = src[i];
char tempd = dest[i];
dest[i] = temps == ignore ? tempd : temps;
}
}
void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
const __m128i vignore = _mm_set1_epi8(ignore);
size_t i;
for (i = 0; i + 16 <= size; i += 16)
{
__m128i v = _mm_loadu_si128((__m128i *)&src[i]);
__m128i vmask = _mm_cmpeq_epi8(v, vignore);
vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
_mm_maskmoveu_si128(v, vmask, (char *)&dest[i]);
}
for (; i < size; ++i)
{
if (src[i] != ignore)
dest[i] = src[i];
}
}
And I got the following results:
Naive:
Duration: 2.04844s
Vectorized:
Pass: PASS
Duration: 3.18553s
SIMD:
Pass: PASS
Duration: 0.481888s
I guess my compiler failed to vectorized (last MSVC), but the SIMD solution is good enough, thanks!
Update (bis) I managed to vectorized it using some pragma instructions for my compile (MSVC) and indeed it is actually faster that SIMD, here is the final code:
void copy_if_vectorized(const uint8_t* src, uint8_t* dest, size_t size, char ignore)
{
#pragma loop(hint_parallel(0))
#pragma loop(ivdep)
for (int i = 0; i < size; ++i) // Sadly no parallelization if i is unsigned, but more than 2Go of data is very unlikely
{
char temps = src[i];
char tempd = dest[i];
dest[i] = temps == ignore ? tempd : temps;
}
}
gcc vectorizes the following code:
#include <stddef.h>
void copy_if(char* src, char* dest, size_t size, char ignore)
{
for (size_t i = 0; i < size; ++i)
{
char temps = src[i];
char tempd = dest[i];
dest[i] = temps == ignore ? tempd : temps;
}
}
Note that both the load from- and the assignment to dest[i]
are unconditional, so the compiler is not restricted by the prohibition against inventing stores in a multi-threaded program.
Edit for a less ancient compiler and processor, and godbolt links:
x86-64 gcc 11.1 compiles the code to the following with -O3 -mavx512f -mavx512bw
, producing an aligned loop processing 64 bytes at a time:
.L5:
vmovdqu8 (%rdi,%rax), %zmm2
vpcmpb $4, %zmm0, %zmm2, %k1
vmovdqu8 %zmm2, (%rsi,%rax){%k1}
addq $64, %rax
cmpq %rax, %r8
jne .L5
This compiler also does well for gcc -std=gnu11 -O3 -mavx2
, processing 32 bytes at a time:
.L5:
vpcmpeqb (%rdi,%rax), %ymm1, %ymm0
vmovdqu (%rdi,%rax), %ymm2
vpblendvb %ymm0, (%rsi,%rax), %ymm2, %ymm0
vmovdqu %ymm0, (%rsi,%rax)
addq $32, %rax
cmpq %rax, %r8
jne .L5
In general, modern compilers do well for any processor architecture with a vector unit.
Old compiler (gcc 4.8.4), old processor (no AVX512), old answer:
For -march=core-avx2
, the generated assembly contains this vectorized loop, working on 32 bytes at a time:
.L9:
vmovdqu (%rdi,%rcx), %ymm1
addq $1, %r10
vmovdqu (%rsi,%rcx), %ymm2
vpcmpeqb %ymm0, %ymm1, %ymm3
vpblendvb %ymm3, %ymm2, %ymm1, %ymm1
vmovdqu %ymm1, (%rsi,%rcx)
addq $32, %rcx
cmpq %r10, %r8
ja .L9
For generic x86-64, the generated assembly contains this vectorized loop, working on 16 bytes at a time:
.L9:
movdqu (%rdi,%r8), %xmm3
addq $1, %r10
movdqa %xmm3, %xmm1
movdqu (%rsi,%r8), %xmm2
pcmpeqb %xmm0, %xmm1
pand %xmm1, %xmm2
pandn %xmm3, %xmm1
por %xmm2, %xmm1
movdqu %xmm1, (%rsi,%r8)
addq $16, %r8
cmpq %r9, %r10
jb .L9
For armv7l-neon, clang-3.7
generates the following loop, working on 16 bytes at a time:
.LBB0_9: @ %vector.body
@ =>This Inner Loop Header: Depth=1
vld1.8 {d18, d19}, [r5]!
subs.w lr, lr, #16
vceq.i8 q10, q9, q8
vld1.8 {d22, d23}, [r4]
vbsl q10, q11, q9
vst1.8 {d20, d21}, [r4]!
bne .LBB0_9
So, the code is not only more readable than assembly or intrinsics, it's also portable to multiple architectures and compilers. New architectures and instruction-set extensions can easily be utilized by recompilation.
Here is an example using SSE2 instrinsics to exploit the maskmovdqu instruction. The SIMD version seems to run at around 2x the speed of the original version on a Haswell CPU (code compiled with clang):
#include <stdio.h>
#include <string.h>
#include <emmintrin.h> // SSE2
#include <sys/time.h> // gettimeofday
void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
for (size_t i = 0; i < size; ++i)
{
if (src[i] != ignore)
dest[i] = src[i];
}
}
void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
const __m128i vignore = _mm_set1_epi8(ignore);
size_t i;
for (i = 0; i + 16 <= size; i += 16)
{
__m128i v = _mm_loadu_si128((__m128i *)&src[i]);
__m128i vmask = _mm_cmpeq_epi8(v, vignore);
vmask = _mm_xor_si128(vmask, _mm_set1_epi8(-1));
_mm_maskmoveu_si128 (v, vmask, (char *)&dest[i]);
}
for ( ; i < size; ++i)
{
if (src[i] != ignore)
dest[i] = src[i];
}
}
#define TIME_IT(init, copy_if, src, dest, size, ignore) \
do { \
const int kLoops = 1000; \
struct timeval t0, t1; \
double t_ms = 0.0; \
\
for (int i = 0; i < kLoops; ++i) \
{ \
init; \
gettimeofday(&t0, NULL); \
copy_if(src, dest, size, ignore); \
gettimeofday(&t1, NULL); \
t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
} \
printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
} while (0)
int main()
{
const size_t N = 10000000;
uint8_t *src = malloc(N);
uint8_t *dest_ref = malloc(N);
uint8_t *dest_init = malloc(N);
uint8_t *dest_test = malloc(N);
for (size_t i = 0; i < N; ++i)
{
src[i] = (uint8_t)rand();
dest_init[i] = (uint8_t)rand();
}
memcpy(dest_ref, dest_init, N);
copy_if_ref(src, dest_ref, N, 0x42);
memcpy(dest_test, dest_init, N);
copy_if_SSE(src, dest_test, N, 0x42);
printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");
TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);
return 0;
}
Compile and test:
$ gcc -Wall -msse2 -O3 copy_if.c && ./a.out
copy_if_SSE: PASS
copy_if_ref: 0.416 ns / element
copy_if_SSE: 0.239 ns / element
(Note: earlier version of this answer had a stray factor of 16 in the timing code, so earlier numbers were 16x higher than they should have been.)
UPDATE
Inspired by @EOF's solution and compiler-generated code I tried a different approach with SSE4, and got much better results:
#include <stdio.h>
#include <string.h>
#include <smmintrin.h> // SSE4
#include <sys/time.h> // gettimeofday
void copy_if_ref(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
for (size_t i = 0; i < size; ++i)
{
if (src[i] != ignore)
dest[i] = src[i];
}
}
void copy_if_EOF(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
for (size_t i = 0; i < size; ++i)
{
char temps = src[i];
char tempd = dest[i];
dest[i] = temps == ignore ? tempd : temps;
}
}
void copy_if_SSE(const uint8_t* src, uint8_t* dest, size_t size, uint8_t ignore)
{
const __m128i vignore = _mm_set1_epi8(ignore);
size_t i;
for (i = 0; i + 16 <= size; i += 16)
{
__m128i vsrc = _mm_loadu_si128((__m128i *)&src[i]);
__m128i vdest = _mm_loadu_si128((__m128i *)&dest[i]);
__m128i vmask = _mm_cmpeq_epi8(vsrc, vignore);
vdest = _mm_blendv_epi8(vsrc, vdest, vmask);
_mm_storeu_si128 ((__m128i *)&dest[i], vdest);
}
for ( ; i < size; ++i)
{
if (src[i] != ignore)
dest[i] = src[i];
}
}
#define TIME_IT(init, copy_if, src, dest, size, ignore) \
do { \
const int kLoops = 1000; \
struct timeval t0, t1; \
double t_ms = 0.0; \
\
for (int i = 0; i < kLoops; ++i) \
{ \
init; \
gettimeofday(&t0, NULL); \
copy_if(src, dest, size, ignore); \
gettimeofday(&t1, NULL); \
t_ms += ((double)(t1.tv_sec - t0.tv_sec) + (double)(t1.tv_usec - t0.tv_usec) * 1.0e-6) * 1.0e3; \
} \
printf("%s: %.3g ns / element\n", #copy_if, t_ms * 1.0e6 / (double)(kLoops * size)); \
} while (0)
int main()
{
const size_t N = 10000000;
uint8_t *src = malloc(N);
uint8_t *dest_ref = malloc(N);
uint8_t *dest_init = malloc(N);
uint8_t *dest_test = malloc(N);
for (size_t i = 0; i < N; ++i)
{
src[i] = (uint8_t)rand();
dest_init[i] = (uint8_t)rand();
}
memcpy(dest_ref, dest_init, N);
copy_if_ref(src, dest_ref, N, 0x42);
memcpy(dest_test, dest_init, N);
copy_if_EOF(src, dest_test, N, 0x42);
printf("copy_if_EOF: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");
memcpy(dest_test, dest_init, N);
copy_if_SSE(src, dest_test, N, 0x42);
printf("copy_if_SSE: %s\n", memcmp(dest_ref, dest_test, N) == 0 ? "PASS" : "FAIL");
TIME_IT(memcpy(dest_test, dest_init, N), copy_if_ref, src, dest_ref, N, 0x42);
TIME_IT(memcpy(dest_test, dest_init, N), copy_if_EOF, src, dest_test, N, 0x42);
TIME_IT(memcpy(dest_test, dest_init, N), copy_if_SSE, src, dest_test, N, 0x42);
return 0;
}
Compile and test:
$ gcc -Wall -msse4 -O3 copy_if_2.c && ./a.out
copy_if_EOF: PASS
copy_if_SSE: PASS
copy_if_ref: 0.419 ns / element
copy_if_EOF: 0.114 ns / element
copy_if_SSE: 0.114 ns / element
Conclusion: while _mm_maskmoveu_si128
seems like a good solution for this problem from a functionality perspective, it doesn't seem to be as efficient as using explicit loads, masking and stores. Furthermore, compiler-generated code (see @EOF's answer) seems to be just as fast as explicitly coded SIMD in this instance.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With