I have this encoding method that works by encoding small blocks of 16x int64_t
to a small block of 16 flag nibbles packed into 8 bytes, followed by 1 or more bytes of payload for each input int64_t
:
flag
flag
(1 byte can store flags of 2 values)sign(value)
in flag (high bit in the nibble)abs(value)
in the buffer, variable number of bytesThis example benchmark is simplified. I have tested and for my use case with actual data: it's faster than LEB128 (aka varint) and lz4 and also compress better. So let's focus on this encoding method or something similar.
How can I improve this code? I mostly care about decompression speed. Some changes to the encode/decode format (or another format) is possible if that improves speed. I wonder if SIMD can be used in this case.
Special note:
abs(value)
has length <= 2 bytes.abs(value)
can be represented by <= 7 bytes. Similarly, 99% of the time, 16 consecutive abs(value)
can be represented by <= 31 byte.encode
and decode_original
// ##### from @aqrit's answer, tweaked by @petercordes #######
#include <cstddef> // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy
unsigned char* encode_buf_bmi(size_t n, const int64_t* src, unsigned char* dst) {
unsigned char* key_ptr = dst;
size_t num_chunks = n >> 3;
unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
while (num_chunks--) {
uint64_t key = 0;
for (size_t i = 0; i < 8; i++) {
uint64_t v = (uint64_t)*src++;
memcpy(data_ptr, &v, sizeof(v)); // assumes little endian.
v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
v = (v << 8) | v; // combine lengths of 7 and 8
v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel...
data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
key |= v << (i * 3);
}
memcpy(key_ptr, &key, 3); // assumes little endian. Probably faster with overlapping 4-byte stores, except for the last chunk to avoid stepping on data
key_ptr += 3;
}
// TODO: tail loop...
return data_ptr; // end of used part of buffer
}
void decode_buf_bmi(size_t n, const unsigned char* src, int64_t* dst) {
const unsigned char* key_ptr = src;
size_t num_chunks = n >> 3;
const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian)
while (num_chunks--) {
unsigned keys;
//memcpy(&keys, key_ptr, 3); // big slowdown with GCC13.2: like 258 ms per buffer on Skylake 3.9GHz, n_msg = 100'000'000
memcpy(&keys, key_ptr, 4); // 138 ms per buffer with 4-byte loads. (Clang 16: 138 ms per buffer either way.)
key_ptr += 3;
for (size_t i = 0; i < 8; i++) {
uint64_t v;
size_t k = keys & 0x07;
size_t len = k + ((k + 1) >> 3); // k==7 is len=8
uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
size_t shift = (64 - (len * 8)) & (size_t)mask;
keys >>= 3;
data_ptr += len;
memcpy(&v, data_ptr, sizeof(v));
v &= mask;
*dst++ = (int64_t)v >> shift;
}
}
data_ptr += sizeof(uint64_t);
}
// TODO: tail loop...
}
// ############## From @Soonts' answer ##############
#include <immintrin.h>
#include <stdint.h>
// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
__m128i v = src;
// https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );
// So far we have computed inclusive prefix sum
// Keep the last byte which is total sum of bytes in the vector
uint16_t tmp = _mm_extract_epi16( v, 7 );
tmp >>= 8;
sum = tmp;
// Subtract original numbers to make exclusive sum = byte offsets to load
return _mm_sub_epi8( v, src );
}
// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
// Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
__m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );
// Mask away the higher pieces in the loaded numbers
__m256i shift = _mm256_cvtepu8_epi64( bits );
const __m256i ones = _mm256_set1_epi32( -1 );
__m256i mask = _mm256_sllv_epi64( ones, shift );
return _mm256_andnot_si256( mask, r );
}
inline __m256i applySigns( __m256i v, __m128i signs )
{
// Sign extend the masks from bytes to int64
__m256i mask = _mm256_cvtepi8_epi64( signs );
// Conditionally negate the numbers
__m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
return _mm256_blendv_epi8( v, neg, mask );
}
class BlockDecoder
{
// Load offsets, in bytes
__m128i offsetBytes;
// Payload bits per element, the bytes are in [ 0 .. 64 ] interval
__m128i payloadBits;
// 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
__m128i signs;
// Count of payload bytes in the complete block
size_t payloadBytes;
// Decode the block header
inline void loadHeader( const uint8_t* rsi )
{
// Load 8 bytes, and zero extend them into uint16_t
const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );
// Unpack lengths
const __m128i seven = _mm_set1_epi8( 7 );
const __m128i l4 = _mm_slli_epi16( v, 4 );
__m128i lengths = _mm_or_si128( v, l4 );
lengths = _mm_and_si128( lengths, seven );
// Transform 7 into 8
__m128i tmp = _mm_cmpeq_epi8( lengths, seven );
lengths = _mm_sub_epi8( lengths, tmp );
// Byte offsets to load 16 numbers, and count of payload bytes in the complete block
offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
// Count of payload bits in the loaded numbers, lengths * 8
payloadBits = _mm_slli_epi16( lengths, 3 );
// Signs vector, we only use the highest 0x80 bit in these bytes
signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
}
// Decode and store 4 numbers
template<int slice>
inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
{
uint32_t off;
__m128i bits, s;
if constexpr( slice != 0 )
{
off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
bits = _mm_shuffle_epi32( payloadBits, imm );
s = _mm_shuffle_epi32( signs, imm );
}
else
{
off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
// For the first slice of the block, the 4 lowest bytes are in the correct locations already
bits = payloadBits;
s = signs;
}
__m256i v = loadAndMask( payload, off, bits );
v = applySigns( v, s );
_mm256_storeu_si256( ( __m256i* )rdi, v );
}
public:
// Decode and store a block of 16 numbers, return pointer to the next block
const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
{
loadHeader( rsi );
rsi += 8;
decodeSlice<0>( rdi, rsi );
decodeSlice<1>( rdi + 4, rsi );
decodeSlice<2>( rdi + 8, rsi );
decodeSlice<3>( rdi + 12, rsi );
return rsi + payloadBytes;
}
};
// ############ reference version and updated benchmark framework ###########
#include <iostream>
#include <string>
#include <cstring>
#include <algorithm>
#include <vector>
#include <chrono>
#include <iomanip>
using namespace std;
// https://en.wikipedia.org/wiki/Xorshift IIRC, wikipedia used to have different constants for some PRNG, maybe some of these, vs. the upstream site; I just copied wikipedia
#include <stdint.h>
uint64_t rol64(uint64_t x, int k)
{
return (x << k) | (x >> (64 - k));
}
struct xoshiro256ss_state {
uint64_t s[4];
};
static xoshiro256ss_state rng_state; // init by main
uint64_t xoshiro256ss(struct xoshiro256ss_state *state)
{
uint64_t *s = state->s;
uint64_t const result = rol64(s[1] * 5, 7) * 9;
uint64_t const t = s[1] << 17;
s[2] ^= s[0];
s[3] ^= s[1];
s[1] ^= s[2];
s[0] ^= s[3];
s[2] ^= t;
s[3] = rol64(s[3], 45);
return result;
}
namespace {
class MyTimer {
std::chrono::time_point<std::chrono::system_clock> start;
public:
void startCounter() {
start = std::chrono::system_clock::now();
}
int64_t getCounterNs() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count();
}
int64_t getCounterMs() {
return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - start).count();
}
double getCounterMsPrecise() {
return std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now() - start).count()
/ 1000000.0;
}
};
}
template <typename T>
void leb128_encode(uint8_t* data, T value, size_t &idx)
{
bool more = true;
bool negative = value < 0;
constexpr int size = sizeof(value) * 8;
while (more) {
uint8_t byte = value & 0x7F;
value >>= 7;
if (negative) value |= (~0LL << (size - 7));
if ((value == 0 && (byte & 0x40) == 0) || (value == -1 && (byte & 0x40) != 0)) {
more = false;
} else {
byte |= 0x80;
}
data[idx++] = byte;
}
}
template <typename T>
void leb128_decode(uint8_t* data, T &value, size_t &idx)
{
value = 0;
int shift = 0;
constexpr int size = sizeof(value) * 8;
uint8_t byte;
do {
byte = data[idx++];
value |= int64_t(byte & 0x7F) << shift;
shift += 7;
} while (byte & 0x80);
if (shift < size && (byte & 0x40)) {
value |= (~0LL << shift);
}
}
int64_t n_msg = 100'000'000;
constexpr int BLK_SIZE = 16;
uint8_t* buffer;
int64_t *original;
int64_t *decoded;
int64_t *dummy_array;
// encode 1 value using variable length encoding,
// storing the result in a flag
void encode(int64_t value, uint8_t &flag, size_t &idx)
{
bool negative = value < 0;
value = abs(value);
uint8_t byte_cnt = 0;
while (value > 0) {
buffer[idx] = value & 0xFF; // lowest byte
idx++;
value >>= 8;
byte_cnt++;
}
// special cases. Since we only have 3 bits to represent 9 values (0->8),
// we choose to group case 7-8 together because they're rarest.
if (byte_cnt == 7) {
buffer[idx] = 0;
idx++;
} else if (byte_cnt == 8) {
// since we only have 3 bits, we use 7 to represent 8 bytes
byte_cnt = 7;
}
flag = byte_cnt; // bit 0-2 represent length
if (negative) flag |= 0b1000; // bit 3 represent sign (1 means negative)
}
// returns compression ratio
double __attribute__ ((noinline)) encode_all(int type = 0)
{
if (type == 0) {
// Soonts version uses the same format as this reference version
size_t idx = 0;
// encode in blocks of 16 values
for (size_t i = 0; i < n_msg; i += 16) {
size_t flag_idx = idx;
idx += 8; // first 8 bytes of a block are for sign/length flags
for (int t = 0; t < BLK_SIZE; t += 2) {
uint8_t low_flag, high_flag;
encode(original[i + t], low_flag, idx);
encode(original[i + t + 1], high_flag, idx);
buffer[flag_idx + t / 2] = (high_flag << 4) | low_flag;
}
}
return (double(idx) / (n_msg * 8));
} else if (type == 1) {
const unsigned char *end = encode_buf_bmi(n_msg, original, buffer);
return (double(end - buffer) / (n_msg * 8));
} else if (type == 2) {
size_t idx = 0;
for (int i = 0; i < n_msg; i++) leb128_encode(buffer, original[i], idx);
return (double(idx) / (n_msg * 8));
}
cout << "encode_all unknown type " << type << std::endl;
exit(1);
}
template <typename T>
void extract_flag(uint8_t flag, T &low_sign, T &low_length, T &high_sign, T &high_length)
{
low_sign = flag & 0b1000;
low_length = flag & 0b0111;
high_sign = (flag >> 4) & 0b1000;
high_length = (flag >> 4) & 0b0111;
}
void __attribute__ ((noinline)) decode_original()
{
static constexpr int64_t mult[] = {
1, 1LL << 8, 1LL << 16, 1LL << 24, 1LL << 32, 1LL << 40, 1LL << 48, 1LL << 56
};
size_t idx = 0, num_idx = 0;
for (size_t i = 0; i < n_msg; i += 16) {
// first 8 bytes of a block are flags
int signs[BLK_SIZE], lens[BLK_SIZE];
for (int t = 0; t < BLK_SIZE; t += 2) {
extract_flag(buffer[idx], signs[t], lens[t], signs[t + 1], lens[t + 1]);
idx++;
}
for (int t = 0; t < BLK_SIZE; t++) if (lens[t] == 7) lens[t] = 8; // special case
// decode BLK_SIZE values
for (int t = 0; t < BLK_SIZE; t++) {
int64_t value = 0;
for (int i = 0; i < lens[t]; i++) value += mult[i] * buffer[idx + i];
if (signs[t]) value = -value;
decoded[num_idx + t] = value;
idx += lens[t];
}
num_idx += BLK_SIZE;
}
}
void __attribute__ ((noinline)) decode_soonts()
{
BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
rsi = dec.decode( rdi, rsi );
rdi += 16;
}
}
void __attribute__ ((noinline)) decode_aqrit()
{
decode_buf_bmi(n_msg, buffer, decoded);
}
void __attribute__ ((noinline)) decode_leb()
{
size_t idx = 0;
for (int i = 0; i < n_msg; i++) {
leb128_decode(buffer, decoded[i], idx);
}
}
//------------------
//------------------
//------------------ MAIN
void __attribute__ ((noinline)) gen_data()
{
for (size_t i = 0; i < n_msg; i++) {
uint64_t modifier = xoshiro256ss(&rng_state); // low 32 bits decide magnitude range, high bits decide sign, to avoid correlation with %100 over the full thing.
if ( uint32_t(modifier) % 100 == 0) {
original[i] = int64_t(xoshiro256ss(&rng_state)) * 1'000'000'000;
} else {
original[i] = xoshiro256ss(&rng_state) % 70'000;
}
if ((modifier>>62) == 0)
original[i] *= -1;
}
}
void __attribute__ ((noinline)) check(const string &name)
{
for (size_t i = 0; i < n_msg; i++) if (original[i] != decoded[i]) {
cout << name << " wrong at " << i << " " << original[i] << " " << decoded[i] << "\n";
exit(1);
}
}
int64_t volatile dummy = 42;
constexpr int N_DUMMY = 8'000'000;
void __attribute__ ((noinline)) clear_cache()
{
#if 1
for (int i = 0; i < N_DUMMY; i++) dummy_array[i] = dummy;
std::sort(dummy_array, dummy_array + N_DUMMY);
dummy = dummy_array[rand() % N_DUMMY];
#else
asm("" ::: "memory");
#endif
}
using FuncPtr = void (*)();
int64_t __attribute__ ((noinline)) test(FuncPtr func, const string &name)
{
clear_cache();
MyTimer timer;
timer.startCounter();
func();
int64_t cost = timer.getCounterNs();
check(name);
return cost;
}
void printTableRow(const std::vector<std::string>& columns, int columnWidth) {
for (const std::string& column : columns) {
std::cout << std::left << std::setw(columnWidth) << column << " | ";
}
std::cout << std::endl;
}
int main()
{
// srand(time(NULL));
rng_state.s[0] = time(nullptr); // other elements left zero. This is terrible but sufficient for our purposes
MyTimer timer;
buffer = new uint8_t[n_msg * 10];
original = new int64_t[n_msg];
decoded = new int64_t[n_msg];
dummy_array = new int64_t[N_DUMMY]; // enough to flush cache
memset(buffer, 0, n_msg * 10);
memset(original, 0, n_msg * 8);
memset(decoded, 0, n_msg * 8);
memset(dummy_array, 0, N_DUMMY * 8);
int n_test = 10;
constexpr int L = 4;
string names[L] = {"original", "soonts", "aqrit", "leb"};
int64_t total_costs[L] = {0, 0, 0, 0};
for (int t = 0; t < n_test; t++) {
gen_data();
double ratio_original = encode_all(0);
int64_t costs[L];
costs[0] = test(decode_original, names[0]);
costs[1] = test(decode_soonts, names[1]);
double ratio_aqrit = encode_all(1);
costs[2] = test(decode_aqrit, names[2]);
double ratio_leb = encode_all(2);
costs[3] = test(decode_leb, names[3]);
if (t == 0) {
cout << "Compression ratios: " << ratio_original << " " << ratio_aqrit << " " << ratio_leb << "\n";
}
cout << t << ": ";
for (int i = 0; i < L; i++) {
cout << (double(costs[i]) / 1'000'000) << " ";
total_costs[i] += costs[i];
}
cout << "\n";
}
vector<string> headers = {"name", "cost (ms)", "cost/int64 (ns)", "cost/byte (ns)"};
printTableRow(headers, 15);
for (int i = 0; i < L; i++) {
double average_cost_ms = double(total_costs[i]) / n_test / 1'000'000;
double cost_int64 = double(total_costs[i]) / (n_msg * n_test);
double cost_byte = cost_int64 / 8;
vector<string> row;
row.push_back(names[i]);
row.push_back(to_string(average_cost_ms));
row.push_back(to_string(cost_int64));
row.push_back(to_string(cost_byte));
printTableRow(row, 15);
}
return 0;
}
Edit: I've just found out that someone discovered pretty much the same encode/decode format (but for int32) in 2017. What a fun coincidence, where 2 people independently "invent" the same thing at 2 different times :D
2nd version of Soonts' answer (func
, inl func
, vpgather
) code here. It looks cleaner, and doesn't read past the end of input buffer, but measurably slower, so I'm keeping the 1st version.
func
: this version doesn't require a decoder object. force_inline = 0, use_gather = 0
inl func
: force_inline = 1, use_gather = 0
vpgather
: force_inline = 0, use_gather = 1
Compile command: g++ -o main test.cpp -std=c+=17 -O3 -mavx2 -mlzcnt -march=native
Benchmark on . -march=native
gives measurable performance boost (~8% best case)
Time to decompress 10^8
int64, averaged over 10 runs:
// AMD EPYC 75F3 (Zen 3 x86-64), 2950MHz
Compression ratios: 0.327437 0.369956
0: 447.651 65.0414 110.171
1: 442.853 64.5952 105.635
2: 434.715 64.1977 108.984
3: 430.09 63.0572 104.074
4: 424.451 64.6397 103.604
5: 436.631 65.0076 104.04
6: 429.59 64.1896 102.936
7: 434.184 64.0522 104.035
8: 430.223 69.3877 105.922
9: 420.659 63.7519 105.563
// AMD EPYC 75F3 -march=native
name | cost (ms) | cost/int64 (ns) | cost/byte (ns) |
original | 433.104635 | 4.331046 | 0.541381 |
soonts | 64.792034 | 0.647920 | 0.080990 |
aqrit | 105.496544 | 1.054965 | 0.131871 |
leb | 438.236469 | 4.382365 | 0.547796 |
soonts (func) | 68.403465 | 0.684035 | 0.085504 |
soon (inl func) | 71.202335 | 0.712023 | 0.089003 |
soon (vpgather) | 78.308508 | 0.783085 | 0.097886 |
// AMD EPYC 75F3 without -march=native
name | cost (ms) | cost/int64 (ns) | cost/byte (ns) |
original | 424.631426 | 4.246314 | 0.530789 |
soonts | 69.623498 | 0.696235 | 0.087029 |
aqrit | 109.895916 | 1.098959 | 0.137370 |
// Intel(R) Xeon(R) Gold 6252N CPU (supposedly 2.3 GHz, I can't change/lock frequency on this borrowed machine)
name | cost (ms) | cost/int64 (ns) | cost/byte (ns) |
original | 549.280441 | 5.492804 | 0.686601 |
soonts | 118.401718 | 1.184017 | 0.148002 |
aqrit | 151.811314 | 1.518113 | 0.189764 |
soon (func) | 122.440794 | 1.224408 | 0.153051 |
soon (inl func) | 120.559447 | 1.205594 | 0.150699 |
soon (vpgather) | 115.704298 | 1.157043 | 0.144630 |
aqrit | 151.811314 | 1.518113 | 0.189764 |
It’s relatively tricky, but still possible to vectorize your decoder. Here’s an implementation which on my computer with AMD Zen3 CPU outperforms your reference version by the factor of 2.8.
#include <immintrin.h>
#include <stdint.h>
// Exclusive prefix sum of unsigned bytes
// When the sum of all bytes exceeds 0xFF, the output is garbage
// Which is fine here because our bytes are in [0..8] interval
inline __m128i exclusivePrefixSum( const __m128i src, size_t& sum )
{
__m128i v = src;
// https://en.wikipedia.org/wiki/Prefix_sum#/media/File:Hillis-Steele_Prefix_Sum.svg
v = _mm_add_epi8( v, _mm_slli_si128( v, 1 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 2 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 4 ) );
v = _mm_add_epi8( v, _mm_slli_si128( v, 8 ) );
// So far we have computed inclusive prefix sum
// Keep the last byte which is total sum of bytes in the vector
uint16_t tmp = _mm_extract_epi16( v, 7 );
tmp >>= 8;
sum = tmp;
// Subtract original numbers to make exclusive sum = byte offsets to load
return _mm_sub_epi8( v, src );
}
// Load 4 uint64 numbers from ( rsi + offsets[ i ] ), keep the lowest bits[ i ] bits in the numbers
inline __m256i loadAndMask( const uint8_t* rsi, uint32_t offsets, __m128i bits )
{
// Load 4 uint64_t numbers from the correct locations, without AVX2 gathers
const int64_t* s0 = (const int64_t*)( rsi + (uint8_t)offsets );
const int64_t* s1 = (const int64_t*)( rsi + (uint8_t)( offsets >> 8 ) );
const int64_t* s2 = (const int64_t*)( rsi + (uint8_t)( offsets >> 16 ) );
const int64_t* s3 = (const int64_t*)( rsi + ( offsets >> 24 ) );
__m256i r = _mm256_setr_epi64x( *s0, *s1, *s2, *s3 );
// Mask away the higher pieces in the loaded numbers
__m256i shift = _mm256_cvtepu8_epi64( bits );
const __m256i ones = _mm256_set1_epi32( -1 );
__m256i mask = _mm256_sllv_epi64( ones, shift );
return _mm256_andnot_si256( mask, r );
}
inline __m256i applySigns( __m256i v, __m128i signs )
{
// Sign extend the masks from bytes to int64
__m256i mask = _mm256_cvtepi8_epi64( signs );
// Conditionally negate the numbers
__m256i neg = _mm256_sub_epi64( _mm256_setzero_si256(), v );
return _mm256_blendv_epi8( v, neg, mask );
}
class BlockDecoder
{
// Load offsets, in bytes
__m128i offsetBytes;
// Payload bits per element, the bytes are in [ 0 .. 64 ] interval
__m128i payloadBits;
// 16 bytes with the 0x80 bit set when the corresponding input was negative; the rest of the bits are unused
__m128i signs;
// Count of payload bytes in the complete block
size_t payloadBytes;
// Decode the block header
inline void loadHeader( const uint8_t* rsi )
{
// Load 8 bytes, and zero extend them into uint16_t
const __m128i v = _mm_cvtepu8_epi16( _mm_loadu_si64( rsi ) );
// Unpack lengths
const __m128i seven = _mm_set1_epi8( 7 );
const __m128i l4 = _mm_slli_epi16( v, 4 );
__m128i lengths = _mm_or_si128( v, l4 );
lengths = _mm_and_si128( lengths, seven );
// Transform 7 into 8
__m128i tmp = _mm_cmpeq_epi8( lengths, seven );
lengths = _mm_sub_epi8( lengths, tmp );
// Byte offsets to load 16 numbers, and count of payload bytes in the complete block
offsetBytes = exclusivePrefixSum( lengths, payloadBytes );
// Count of payload bits in the loaded numbers, lengths * 8
payloadBits = _mm_slli_epi16( lengths, 3 );
// Signs vector, we only use the highest 0x80 bit in these bytes
signs = _mm_or_si128( _mm_slli_epi16( v, 8 ), l4 );
}
// Decode and store 4 numbers
template<int slice>
inline void decodeSlice( int64_t* rdi, const uint8_t* payload ) const
{
uint32_t off;
__m128i bits, s;
if constexpr( slice != 0 )
{
off = (uint32_t)_mm_extract_epi32( offsetBytes, slice );
constexpr int imm = _MM_SHUFFLE( slice, slice, slice, slice );
bits = _mm_shuffle_epi32( payloadBits, imm );
s = _mm_shuffle_epi32( signs, imm );
}
else
{
off = (uint32_t)_mm_cvtsi128_si32( offsetBytes );
// For the first slice of the block, the 4 lowest bytes are in the correct locations already
bits = payloadBits;
s = signs;
}
__m256i v = loadAndMask( payload, off, bits );
v = applySigns( v, s );
_mm256_storeu_si256( ( __m256i* )rdi, v );
}
public:
// Decode and store a block of 16 numbers, return pointer to the next block
const uint8_t* decode( int64_t* rdi, const uint8_t* rsi )
{
loadHeader( rsi );
rsi += 8;
decodeSlice<0>( rdi, rsi );
decodeSlice<1>( rdi + 4, rsi );
decodeSlice<2>( rdi + 8, rsi );
decodeSlice<3>( rdi + 12, rsi );
return rsi + payloadBytes;
}
};
I’ve tested the code very little, and it could be possible to improve the performance further.
As you see, the implementation requires AVX2, and is branchless.
Usage example:
BlockDecoder dec;
const uint8_t* rsi = buffer;
int64_t* rdi = decoded;
for( size_t i = 0; i < n_msg; i += 16 )
{
rsi = dec.decode( rdi, rsi );
rdi += 16;
}
NB! Typically, when the last number in the stream doesn’t use the maximum 8 bytes, the implementation loads a few bytes past the end of the encoded buffer. You can pad your compressed data, or implement a special case to decode the final block of the stream, or adjust the encoder to always emit 8 bytes for the final number of the final block.
P.S. The initial version used _mm256_i32gather_epi64
instruction to load four int64 numbers from memory, using a base pointer + four byte offsets from another vector. However, on AMD CPUs it was slightly slower than 4 scalar loads. If you target Intel, might be better to use the initial version, see edit history.
A scalar implementation could be branchless, for the most part.
Padding out the control bits with the sign bits probably cost bits in the long run. Instead, here all the control bits are packed at the start of the stream (similar to streamvbyte).
Variable sign-extension is used because it is cheaper than zigzag for bytewise scalar implmentations.
Proof of concept assumes:
#include <cstddef> // size_t
#include <cstdint> // uint64_t, int64_t
#include <immintrin.h> // lzcnt intrinsic
#include <cstring> // memcpy
unsigned char* encode(size_t n, const int64_t* src, unsigned char* dst) {
unsigned char* key_ptr = dst;
size_t num_chunks = n >> 3;
unsigned char* data_ptr = &dst[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
while (num_chunks--) {
uint64_t key = 0;
for (size_t i = 0; i < 8; i++) {
uint64_t v = (uint64_t)*src++;
memcpy(data_ptr, &v, sizeof(v)); // assumes little endian
v = (v + v) ^ v; // clear redundant sign bits (and trash everything else)
v = (v << 8) | v; // combine lengths of 7 and 8
v = (_lzcnt_u64(v | 1) ^ 63) >> 3; // use BSR on intel...
data_ptr += v + ((v + 1) >> 3); // key of 7 is length of 8
key |= v << (i * 3);
}
memcpy(key_ptr, &key, 3); // assumes little endian
key_ptr += 3;
}
// TODO: tail loop...
return dst;
}
void decode(size_t n, const unsigned char* src, int64_t* dst) {
const unsigned char* key_ptr = src;
size_t num_chunks = n >> 3;
const unsigned char* data_ptr = &src[(num_chunks * 3) + ((((n & 0x7) * 3) + 7) >> 3)];
if (n >= 19) { // if has at least 8 bytes of padding before data_ptr
data_ptr -= sizeof(uint64_t); // left aligned the truncated val in register (little endian)
while (num_chunks--) {
unsigned keys;
memcpy(&keys, key_ptr, 3);
key_ptr += 3;
for (size_t i = 0; i < 8; i++) {
uint64_t v;
size_t k = keys & 0x07;
size_t len = k + ((k + 1) >> 3); // k==7 is len=8
uint64_t mask = (uint64_t)0 - ((k + 7) >> 3); // k ? -1 : 0
size_t shift = (64 - (len * 8)) & (size_t)mask;
keys >>= 3;
data_ptr += len;
memcpy(&v, data_ptr, sizeof(v));
v &= mask;
*dst++ = (int64_t)v >> shift;
}
}
data_ptr += sizeof(uint64_t);
}
// TODO: tail loop...
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With