Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

optimize unaligned SSE2/AVX2 XOR

In my code I have to handle "unmasking" of websocket packets, which essentially means XOR'ing unaligned data of arbitrary length. Thanks to SO (Websocket data unmasking / multi byte xor) I already have found out how to (hopefully) speed this up using SSE2/AVX2 extensions, but looking at it now, it seems to me that my handling of unaligned data is totally sub-optimal. Is there any way to optimize my code or at least make it simpler with same performance, or is my code already the best performing?

Here's the important part of the code (for the question I'm assuming that data will always be at least enough to run the AVX2 cycle once, but at the same time it will mostly run only a few times at most):

// circular shift left for uint32
int cshiftl_u32(uint32_t num, uint8_t shift) {
   return (num << shift) | (num >> (32 - shift));                                                                       
}                                                                                                                     

// circular shift right for uint32
int cshiftr_u32(uint32_t num, uint8_t shift) {
   return (num >> shift) | (num << (32 - shift));                                                                       
}                                                                                                                     

void optimized_xor_32( uint32_t mask, uint8_t *ds, uint8_t *de ) {
   if (ds == de) return; // zero data len -> nothing to do

   uint8_t maskOffset = 0;

// process single bytes till 4 byte alignment ( <= 3 )
   for (; ds < de && ( (uint64_t)ds & (uint64_t)3 ); ds++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

   if (ds == de) return; // done, return

   if (maskOffset != 0) { // circular left-shift mask around so it works for other instructions
      mask = cshiftl_u32(mask, maskOffset);

      maskOffset = 0;
   }

// process 4 byte block till 8 byte alignment ( <= 1 )
   uint8_t *de32 = (uint8_t *)((uint64_t)de & ~((uint64_t)31));

   if ( ds < de32 && ( (uint64_t)de & (uint64_t)7 ) ) {
      *(uint32_t *)ds ^= mask; // mask is uint32_t

      if (++ds == de) return;
   }

// process 8 byte block till 16 byte alignment ( <= 1 )
   uint64_t mask64 = mask | (mask << 4);
   uint8_t *de64 = (uint8_t *)((uint64_t)de & ~((uint64_t)63));

   if ( ds < de64 && ( (uint64_t)ds & (uint64_t)15 ) ) {
      *(uint64_t *)ds ^= mask64;

      if (++ds == de) return; // done, return
   }


// process 16 byte block till 32 byte alignment ( <= 1) (if supported)
#ifdef CPU_SSE2 
   __m128i v128, v128_mask;
   v128_mask = _mm_set1_epi32(mask);

   uint8_t *de128 = (uint8_t *)((uint64_t)de & ~((uint64_t)127));

   if ( ds < de128 && ( (uint64_t)ds & (uint64_t)31 ) ) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);

      if (++ds == de) return; // done, return
   }

#endif
#ifdef CPU_AVX2 // process 32 byte blocks (if supported -> haswell upwards)
   __m256i v256, v256_mask;
   v256_mask = _mm256_set1_epi32(mask);

   uint8_t *de256 = (uint8_t *)((uint64_t)de & ~((uint64_t)255));

   for (; ds < de256; ds+=32) {
      v256 = _mm256_load_si256((__m256i *)ds);
      v256 = _mm256_xor_si256(v256, v256_mask);
      _mm256_store_si256((__m256i *)ds, v256);
   }

   if (ds == de) return; // done, return
#endif
#ifdef CPU_SSE2 // process remaining 16 byte blocks (if supported)
   for (; ds < de128; ds+=16) {
      v128 = _mm_load_si128((__m128i *)ds);
      v128 = _mm_xor_si128(v128, v128_mask);
      _mm_store_si128((__m128i *)ds, v128);
   }

   if (ds == de) return; // done, return

#endif
   // process remaining 8 byte blocks 
   // this should always be supported, so remaining can be assumed to be executed <= 1 times
   for (; ds < de64; ds += 8) {
      *(uint64_t *)ds ^= mask64;
   }

   if (ds == de) return; // done, return

   // process remaining 4 byte blocks ( <= 1)
   if (ds < de32) {
      *(uint32_t *)ds ^= mask;

      if (++ds == de) return; // done, return
   }


   // process remaining bytes ( <= 3)

   for (; ds < de; ds ++) {
      *ds ^= *((uint8_t *)(&mask) + maskOffset);
      maskOffset = (maskOffset + 1) & (uint8_t)3;
   }

}

P.S.: Please ignore the use of #ifdef instead of cpuid or the like for cpu flag detection.

like image 956
griffin Avatar asked Nov 02 '22 17:11

griffin


1 Answers

Unlike what it says in the manual most Intel processors are actually quite good at handling unaligned data. Since you are using Intel's compiler builtins for vector handling I assume you have access to a reasonably recent version of icc.

If you can not naturally align your data then I am afraid that what you are doing is as close as you can get to maximum performance. In terms of making the code more readable and deployable on Xeon Phi(64 byte vector registers)/Future longer vector processors I would suggest you start using Intel Cilk Plus.

Example:

void intel_cilk_xor(uint32_t mask, uint8_t *d, size_t length) {
    while (length & 0x3) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    // switch to 4 bytes per block
    uint32_t _d = d;
    length >>= 2;

    // Intel Cilk Plus Array Notation
    // Should expand automatically to the best possible SIMD instructions
    // you are compiling for
    _d[0:length] ^= mask;
}

Please note that I did not test this code as I do not have access to an Intel compiler right now. If you will encounter problems then I can go over it when I am back in my office next week.

If you rather prefer intrinsics then proper use of preprocessor macros can significantly ease up your life:

#if defined(__MIC__)
// intel Xeon Phi
#define VECTOR_BLOCKSIZE 64
// I do not remember the correct types/instructions right now
#error "TODO: MIC handling"
#elif defined(CPU_AVX2)
#define VECTOR_BLOCKSIZE 32
typedef __m256i my_vector_t;
#define VECTOR_LOAD_MASK _mm256_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si256(d, _mm256_set1_epi32(_mm256_load_si256(d), mask))
#elif defined(CPU_SSE2) 
#define VECTOR_BLOCKSIZE 16
typedef __m128i my_vector_t;
#define VECTOR_LOAD_MASK _mm128_set1_epi32
#define VECTOR_XOR(d, mask) _mm_store_si128(d, _mm128_set1_epi32(_mm128_load_si128(d), mask))
#else
#define VECTOR_BLOCKSIZE 8
#define VECTOR_LOAD_MASK(mask) ((mask) << 32 | (mask))
#define VECTOR_XOR(d, mask) (*(d)) ^= (mask)
typedef uint64_t my_vector_t;
#fi

void optimized_xor_32( uint32_t mask, uint8_t *d, size_t length ) {
    size_t i;

    // there really is no point in having extra
    // branches for different vector lengths if they are
    // executed at most once
    // branch prediction is your friend here
    // so we do one byte at a time until the block size
    // is reached

    while (length && (d & (VECTOR_BLOCKSIZE - 1))) {
        *(d++) ^= mask;
        asm ("rold $8, %0" : "+g" (mask) :: "cc"); // rotate dword one byte left
        length--;
    }

    my_vector_t * d_vector = (my_vector_t *)d;
    my_vector_t vector_mask = VECTOR_LOAD_MASK(mask);

    size_t vector_legth = length / VECTOR_BLOCKSIZE; // compiler will optimise this to a bitshift
    length &= VECTOR_BLOCKSIZE -1; // remaining length

    for (i = 0; i < vector_legth; i++) {
      VECTOR_XOR(d_vector + i, vector_mask);
    }

    // process the tail
    d = (uint8_t*)(d_vector + i);
    for (i = 0; i < length; i++) {
      d[i] ^= mask;
      asm ("rold $8, %0" : "+g" (mask) :: "cc");
    }

}

On another note: You may want to use the x86 rotate instruction instead of bit shifts to rotate mask:

#define asm_rol(var, bits) asm ("rol %1, %0" : "+r" (var) : "c" ((uint8_t)bits) : "cc")
like image 91
Sergey L. Avatar answered Nov 15 '22 04:11

Sergey L.