Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

tidy code for asynchronous IO

Whilst asynchronous IO (non-blocking descriptors with select/poll/epoll/kqueue etc) is not the most documented thing on the web, there are a handful of good examples.

However, all these examples, having determined the handles that are returned by the call, just have a 'do_some_io(fd)' stub. They don't really explain how to best approach the actual asynchronous IO in such a method.

Blocking IO is very tidy and straightforward to read code. Non-blocking, async IO is, on the other hand, hairy and messy.

What approaches are there? What are robust and readable?

void do_some_io(int fd) {   switch(state) {     case STEP1:        ... async calls        if(io_would_block)           return;        state = STEP2;     case STEP2:        ... more async calls        if(io_would_block)           return;        state = STEP3;     case STEP3:        ...   } } 

or perhaps (ab)using GCC's computed gotos:

#define concatentate(x,y) x##y #define async_read_xx(var,bytes,line)       \    concatentate(jmp,line):                  \    if(!do_async_read(bytes,&var)) {         \        schedule(EPOLLIN);                   \        jmp_read = &&concatentate(jmp,line); \        return;                              \ }  // macros for making async code read like sync code #define async_read(var,bytes) \     async_read_xx(var,bytes,__LINE__)  #define async_resume()            \      if(jmp_read) {               \          void* target = jmp_read; \          jmp_read = NULL;         \          goto *target;            \      }  void do_some_io() {    async_resume();    async_read(something,sizeof(something));    async_read(something_else,sizeof(something_else)); } 

Or perhaps C++ exceptions and a state machine, so worker functions can trigger the abort/resume bit, or perhaps a table-driven state-machine?

Its not how to make it work, its how to make it maintainable that I'm chasing!

like image 610
Will Avatar asked May 19 '09 14:05

Will


People also ask

What is asynchronous IO?

In computer science, asynchronous I/O (also non-sequential I/O) is a form of input/output processing that permits other processing to continue before the transmission has finished.

How asynchronous IO increases the system efficiency?

Data copies from and to user buffers are asynchronous to the application that initiates the request. This overlapped processing makes efficient use of multiple processors and in many cases improves paging rates because system buffers are freed for reuse when data arrives.


2 Answers

I suggest take a look on: http://www.kegel.com/c10k.html, second take a look on existing libraries like libevent, Boost.Asio that already do the job and see how they work.

The point is that the approach may be different for each type of system call:

  • select is simple reactor
  • epoll have both edge or level triggered interface that require different approach
  • iocp is proactor require other approach

Suggestion: use good existing library like Boost.Asio for C++ or libevent for C.

EDIT: This is how ASIO handles this

class connection {    boost::asio:ip::tcp::socket socket_; public:    void run()    {          // for variable length chunks          async_read_until(socket_,resizable_buffer,'\n',                boost::bind(&run::on_line_recieved,this,errorplacehplder);          // or constant length chunks          async_read(socket_,buffer(some_buf,buf_size),                boost::bind(&run::on_line_recieved,this,errorplacehplder);    }    void on_line_recieved(error e)    {         // handle it         run();    }  }; 

Because ASIO works as proactor it notifies you when operation is complete and handles EWOULDBLOCK internally.

If you word as reactor you may simulate this behavior:

 class conn {     // Application logic      void run() {        read_chunk(&conn::on_chunk_read,size);     }     void on_chunk_read() {          /* do something;*/     }      // Proactor wrappers      void read_chunk(void (conn::*callback),int size, int start_point=0) {        read(socket,buffer+start,size)        if( complete )           (this->*callback()        else {           this -> tmp_size-=size-read;           this -> tmp_start=start+read;           this -> tmp_callback=callback           your_event_library_register_op_on_readable(callback,socket,this);        }     }     void callback()     {        read_chunk(tmp_callback,tmp_size,tmp_start);     }  } 

Something like that.

like image 133
Artyom Avatar answered Sep 22 '22 16:09

Artyom


State machines are one nice approach. It's a bit of complexity up front that'll save you headaches in the future, where the future starts really, really soon. ;-)

Another method is to use threads and do blocking I/O on a single fd in each thread. The trade-off here is that you make I/O simple but may introduce complexity in synchronization.

like image 34
dwc Avatar answered Sep 21 '22 16:09

dwc