Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Single Producer / Consumer Ring Buffer in Shared Memory

Recently I've been playing about with using shared memory for IPC. One thing I've been trying to implement is a simple ring buffer with 1 process producing and 1 process consuming. Each process has its own sequence number to track its position. These sequence numbers are updated using atomic ops to ensure the correct values are visible to the other process. The producer will block once the ring buffer is full. The code is lock free in that no semaphores or mutexes are used.

Performance wise I'm getting roughly 20 million messages per second on my rather modest VM - Pretty happy with that :)

What I'm curious about how 'correct' my code is. Can anyone spot any inherent issues / race conditions? Here's my code. Thanks in advance for any comments.

#include <stdlib.h>
#include <stdio.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <time.h>
#include <unistd.h>
#include <string.h>

#define SHM_ID "/mmap-test"
#define BUFFER_SIZE 4096
#define SLEEP_NANOS 1000   // 1 micro

struct Message
{
    long _id;
    char _data[128];
};

struct RingBuffer
{
    size_t _rseq;
    char _pad1[64];

    size_t _wseq;
    char _pad2[64];

    Message _buffer[BUFFER_SIZE];
};

void
producerLoop()
{
    int size = sizeof( RingBuffer );
    int fd = shm_open( SHM_ID, O_RDWR | O_CREAT, 0600 );
    ftruncate( fd, size+1 );

    // create shared memory area
    RingBuffer* rb = (RingBuffer*)mmap( 0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 );
    close( fd );

    // initialize our sequence numbers in the ring buffer
    rb->_wseq = rb->_rseq = 0;
    int i = 0;

    timespec tss;
    tss.tv_sec = 0;
    tss.tv_nsec = SLEEP_NANOS;

    while( 1 )
    {
        // as long as the consumer isn't running behind keep producing
        while( (rb->_wseq+1)%BUFFER_SIZE != rb->_rseq%BUFFER_SIZE )
        {
            // write the next entry and atomically update the write sequence number
            Message* msg = &rb->_buffer[rb->_wseq%BUFFER_SIZE];
            msg->_id = i++;
            __sync_fetch_and_add( &rb->_wseq, 1 );
        }

        // give consumer some time to catch up
        nanosleep( &tss, 0 );
    }
}

void
consumerLoop()
{
    int size = sizeof( RingBuffer );
    int fd = shm_open( SHM_ID, O_RDWR, 0600 );
    if( fd == -1 ) {
        perror( "argh!!!" ); return;
    }

    // lookup producers shared memory area
    RingBuffer* rb = (RingBuffer*)mmap( 0, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 );

    // initialize our sequence numbers in the ring buffer
    size_t seq =  0;
    size_t pid = -1;

    timespec tss;
    tss.tv_sec = 0;
    tss.tv_nsec = SLEEP_NANOS;

    while( 1 )
    {
        // while there is data to consume
        while( seq%BUFFER_SIZE != rb->_wseq%BUFFER_SIZE )
        {
            // get the next message and validate the id
            // id should only ever increase by 1
            // quit immediately if not
            Message msg = rb->_buffer[seq%BUFFER_SIZE];
            if( msg._id != pid+1 ) {
                printf( "error: %d %d\n", msg._id, pid ); return;
            }
            pid = msg._id;
            ++seq;
        }

        // atomically update the read sequence in the ring buffer
        // making it visible to the producer
        __sync_lock_test_and_set( &rb->_rseq, seq );

        // wait for more data
        nanosleep( &tss, 0 );
    }
}

int
main( int argc, char** argv )
{
    if( argc != 2 ) {
        printf( "please supply args (producer/consumer)\n" ); return -1;
    } else if( strcmp( argv[1], "consumer" ) == 0 ) {
        consumerLoop();
    } else if( strcmp( argv[1], "producer" ) == 0 ) {
        producerLoop();
    } else {
        printf( "invalid arg: %s\n", argv[1] ); return -1;
    }
}
like image 486
user1664098 Avatar asked Apr 29 '13 16:04

user1664098


1 Answers

Seems correct to me at a first glance. I realize that you are happy with the performance but a fun experiment might be to use something more light weight than a __sync_fetch_and_add. AFAIK it is a full memory barrier, which is expensive. Since there is a single producer and a single consumer, a release and a corresponding acquire operation should give you better performance. Facebook's Folly library has a single producer single consumer queue that uses the new C++11 atomics here: https://github.com/facebook/folly/blob/master/folly/ProducerConsumerQueue.h

like image 152
Rajiv Avatar answered Nov 10 '22 00:11

Rajiv