I am testing kernel asynchronous io functions (not posix aio) and am trying to figure out how it works. The code below is a complete program where I simply write an array repeatedly to a file opened using O_DIRECT. I get an error in the callback function "write missed bytes expect 1024 got 0" (see the fprintf statement in work_done()).
For those not familiar with kernel aio, the code below does the following:
I get an error at step 5. If I do not open the file using O_DIRECT, things work fine, but it beats the purpose of having async writes. Can someone tell me what I am doing wrong? Is this the correct usage of kernel aio, for example, is my use of callbacks correct? Are there any restrictions on the usage of O_DIRECT?
I compile using 'gcc -Wall test.c -laio'
Thanks in advance.
/* 
 * File:   myaiocp.c
 * Author: kmehta
 *
 * Created on July 11, 2011, 12:50 PM
 *
 *
 * Testing kernel aio. 
 * Program creates a 2D matrix and writes it multiple times to create a file of desired size. 
 * Writes are performed using kernel aio functions (io_prep_pwrite, io_submit, etc.)
 */
#define _GNU_SOURCE
#define _XOPEN_SOURCE 600
#include <stdio.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <fcntl.h>
#include <string.h>
#include <sys/uio.h>
#include <sys/time.h>
#include <omp.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <libaio.h>
char ** buf;
long seg_size;
int seg_rows;
double total_size;
char * filename;
static int wait_count = 0;
void io_task();
void cleanup();
void allocate_2D_matrix(int[]);
int file_open(char *);
void wr_done(io_context_t ctx, struct iocb* iocb, long res, long res2);
int main(int argc, char **argv) {
    total_size  = 1048576;      //1MB
    seg_size    = 1024;         //1kB
    seg_rows    = 1024;
    filename    = "aio.out";
    int dims[] = {seg_rows, seg_size};
    allocate_2D_matrix(dims);   //Creates 2D matrix
    io_task();
    cleanup();
    return 0;
}
/*
 * Create a 2D matrix
 */
void allocate_2D_matrix(int dims[2]) {
    int i;
    char *data;
    //create the matrix
    data = (char *) calloc(1, dims[0] * dims[1] * sizeof (char));
    if (data == NULL) {
        printf("\nCould not allocate memory for matrix.\n");
        exit(1);
    }
    buf = (char **) malloc(dims[0] * sizeof (char *));
    if (buf == NULL) {
        printf("\nCould not allocate memory for matrix.\n");
        exit(1);
    }
    for (i = 0; i < dims[0]; i++) {
        buf[i] = &(data[i * dims[1]]);
    }
}
static void io_error(const char *func, int rc)
{
    if (rc == -ENOSYS)
        fprintf(stderr, "AIO not in this kernel\n");
    else if (rc < 0)
        fprintf(stderr, "%s: %s\n", func, strerror(-rc));
    else
        fprintf(stderr, "%s: error %d\n", func, rc);
    exit(1);
}
/*
 * Callback function
 */
static void work_done(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
    if (res2 != 0) {
        io_error("aio write", res2);
      }
      if (res != iocb->u.c.nbytes) {
            fprintf(stderr, "write missed bytes expect %lu got %ld\n",
                  iocb->u.c.nbytes, res2);
            exit(1);
      }
      wait_count --;
      printf("%d ", wait_count);
}
/*
 * Wait routine. Get events and call the callback function work_done()
 */
int io_wait_run(io_context_t ctx, long iter)
{
      struct io_event events[iter];
      struct io_event *ep;
      int ret, n;
      /*
       * get up to aio_maxio events at a time.
       */
      ret = n = io_getevents(ctx, iter, iter, events, NULL);
      printf("got %d events\n", n);
      /*
       * Call the callback functions for each event.
       */
      for (ep = events ; n-- > 0 ; ep++) {
            io_callback_t cb = (io_callback_t)ep->data ; struct iocb *iocb = ep->obj ; cb(ctx, iocb, ep->res, ep->res2);
      }
      return ret;
}
void io_task() {
    long offset = 0;
    int bufIndex = 0;
    //Open file
    int fd = file_open(filename);
    //Initialize structures
    long i; 
    long iter = total_size / seg_size;  //No. of iterations to reach desired file size (total_size)
    io_context_t myctx;
    if(0 != io_queue_init(iter, &myctx))
    {
        perror("Could not initialize io queue");
        exit(EXIT_FAILURE);
    }
    struct iocb * ioq[iter];
    //loop through iter times to reach desired file size
    for (i = 0; i < iter; i++) {
        struct iocb *io = (struct iocb*) malloc(sizeof (struct iocb));
        io_prep_pwrite(io, fd, buf[bufIndex], seg_size, offset);
        io_set_callback(io, work_done);
        ioq[i] = io;
        offset += seg_size;
        bufIndex ++;
        if (bufIndex > seg_rows - 1)    //If entire matrix written, start again from index 0
            bufIndex = 0;
    }
    printf("done preparing. Now submitting..\n");
    if(iter != io_submit(myctx, iter, ioq))
    {
        perror("Failure on submit");
        exit(EXIT_FAILURE);
    }
    printf("now awaiting completion..\n");
    wait_count = iter;
    int res;
    while (wait_count) {
        res = io_wait_run(myctx, iter);
        if (res < 0)
            io_error("io_wait_run", res);
    }
    close(fd);
}
void cleanup() {
    free(buf[0]);
    free(buf);
}
int file_open(char *filename) {
    int fd;
    if (-1 == (fd = open(filename, O_DIRECT | O_CREAT | O_WRONLY | O_TRUNC, 0666))) {
        printf("\nError opening file. \n");
        exit(-1);
    }
    return fd;
}
                Linux asynchronous I/O is a relatively recent addition to the Linux kernel. It's a standard feature of the 2.6 kernel, but you can find patches for 2.4. The basic idea behind AIO is to allow a process to initiate a number of I/O operations without having to block or wait for any to complete.
The Linux AIO model is used as follows: Open an I/O context to submit and reap I/O requests from. Create one or more request objects and set them up to represent the desired operation. Submit these requests to the I/O context, which will send them down to the device driver to process on the device.
The POSIX asynchronous I/O (AIO) interface allows applications to initiate one or more I/O operations that are performed asynchronously (i.e., in the background).
Interface. Internally it works by creating two buffers dubbed as "queue rings" (circular buffers) for storage of submission and completion of I/O requests (for storage devices, submission queue (SQ) and completion queue (CQ) respectively).
First of all, good job using libaio instead of POSIX aio.
Are there any restrictions on the usage of O_DIRECT ?
I'm not 100% sure this is the real problem, but O_DIRECT has some requirements (quoting mostly from TLPI):
posix_memalign)At a glance, I can see you are not taking aby precautions to align memory in allocate_2D_matrix.
If I do not open the file using O_DIRECT, things work fine, but it beats the purpose of having async writes.
This happens not to be the case. Asynchronous I/O works well without O_DIRECT (for instance think of the number of system calls slashed).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With