Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Fast IPC/Socket communication in Java/Python

Two processes (Java and Python) need to communicate in my application. I noticed that the socket communication takes 93% of the run time. Why is communication so slow? Should I be looking for alternatives to socket communication or can this be made faster?

Update: I discovered a simple fix. It seems like the Buffered output stream is not really buffered for some unknown reason. So, I now put all data into string buffers in both client/server processes. I write it to the socket in the flush method.

I'm still interested in an example of the usage of shared memory to exchange data quickly between processes.

Some additional information:

  1. Message size in the applicaiton is under 64kb most of the time.
  2. The server is in Java, the client is written in Python.
  3. Socket IPC is implemented below: it takes 50 cycles sending 200 bytes ! This has got to be too high. If I send 2 bytes in 5000 cycles, it takes a lot less time.
  4. Both processes run on one Linux machine.
  5. In the real application about 10 calls to client's iFid.write() are made each cycle.
  6. This is done on a Linux system.

This is the server side:

public class FastIPC{
    public PrintWriter out;
    BufferedReader in;
    Socket socket = null;
    ServerSocket serverSocket = null;


    public FastIPC(int port) throws Exception{
        serverSocket = new ServerSocket(port);
        socket = serverSocket.accept();
        out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true);
        in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
    }

    public void send(String msg){
        out.println(msg); // send price update to socket
    }

    public void flush(){
        out.flush();
    }

    public String recv() throws Exception{
        return in.readLine();
    }

    public static void main(String[] args){
        int port = 32000;
        try{
            FastIPC fip = new FastIPC(port);
            long start = new Date().getTime();
            System.out.println("Connected.");
            for (int i=0; i<50; i++){
                for(int j=0; j<100; j++)
                    fip.send("+");
                fip.send(".");
                fip.flush();
                String msg = fip.recv();
            }
            long stop = new Date().getTime();
            System.out.println((double)(stop - start)/1000.);
        }catch(Exception e){
            System.exit(1);
        }
    }
}

And the client side is:

import sys
import socket

class IPC(object):
    def __init__(self):
        self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.s.connect(("localhost", 32000))
        self.fid = self.s.makefile() # file wrapper to read lines
        self.listenLoop() # wait listening for updates from server

    def listenLoop(self):
        fid = self.fid
        print "connected"
        while True:
            while True:
                line = fid.readline()
                if line[0]=='.':
                    break
            fid.write('.\n')
            fid.flush()

if __name__ == '__main__':
    st = IPC()
like image 417
fodon Avatar asked Feb 12 '12 16:02

fodon


2 Answers

You have a number of options. Since you are using Linux you could use UNIX domain sockets. Or, you could serialise the data as ASCII or JSon or some other format and feed it through a pipe, SHM (shared memory segment), message queue, DBUS or similar. It's worth thinking about what sort of data you have, as these IPC mechanisms have different performance characteristics. There's a draft USENIX paper with a good analysis of the various trade-offs that is worth reading.

Since you say (in the comments to this answer) that you prefer to use SHM, here are some code samples to start you off. Using the Python posix_ipc library:

import posix_ipc # POSIX-specific IPC
import mmap      # From Python stdlib

class SharedMemory(object):
    """Python interface to shared memory. 
    The create argument tells the object to create a new SHM object,
    rather than attaching to an existing one.
    """

    def __init__(self, name, size=posix_ipc.PAGE_SIZE, create=True):
        self.name = name
        self.size = size
        if create:
            memory = posix_ipc.SharedMemory(self.name, posix_ipc.O_CREX,
                                            size=self.size)
        else:
            memory = posix_ipc.SharedMemory(self.name)
        self.mapfile = mmap.mmap(memory.fd, memory.size)
        os.close(memory.fd)
        return

    def put(self, item):
        """Put item in shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        pickle.dump(item, self.mapfile, protocol=2)
        return

    def get(self):
        """Get a Python object from shared memory.
        """
        # TODO: Deal with the case where len(item) > size(self.mapfile)
        # TODO: Guard this method with a named semaphore
        self.mapfile.seek(0)
        return pickle.load(self.mapfile)

    def __del__(self):
        try:
            self.mapfile.close()
            memory = posix_ipc.SharedMemory(self.name)
            memory.unlink()
        except:
            pass
        return    

For the Java side you want to create the same class, despite what I said in the comments JTux seems to provide the equivalent functionality and the API you need is in UPosixIPC class.

The code below is an outline of the sort of thing you need to implement. However, there are several things missing -- exception handling is the obvious one, also some flags (find them in UConstant), and you'll want to add in a semaphore to guard the put / get methods. However, this should set you on the right track. Remember that an mmap or memory-mapped file is a file-like interface to a segment of RAM. So, you can use its file descriptor as if it were the fd of a normal file.

import jtux.*;

class SHM {

    private String name;
    private int size;
    private long semaphore;
    private long mapfile; // File descriptor for mmap file

    /* Lookup flags and perms in your system docs */
    public SHM(String name, int size, boolean create, int flags, int perms) {
        this.name = name;
        this.size = size;
        int shm;
        if (create) {
            flags = flags | UConstant.O_CREAT;
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        } else {
            shm = UPosixIPC.shm_open(name, flags, UConstant.O_RDWR);
        }
        this.mapfile = UPosixIPC.mmap(..., this.size, ..., flags, shm, 0);
        return;
    }


    public void put(String item) {
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        UFile.write(item.getBytes(), this.mapfile);
        return;
    }


    public String get() {    
        UFile.lseek(this.mapfile(this.mapfile, 0, 0));
        byte[] buffer = new byte[this.size];
        UFile.read(this.mapfile, buffer, buffer.length);
        return new String(buffer);
    }


    public void finalize() {
        UPosix.shm_unlink(this.name);
        UPosix.munmap(this.mapfile, this.size);
    }

}
like image 111
snim2 Avatar answered Oct 19 '22 23:10

snim2


Some thoughts

  • The server is in Java, the client is written in Python.

An odd combination, but is there any reason one cannot call the other sending via stdin, stdout?

  • Socket IPC is implemented below: it takes 50 cycles sending 200 bytes ! This has got to be too high. If I send 2 bytes in 5000 cycles, it takes a lot less time.

Any call to the OS is going to be relatively slow (latency wise). Using shared memory can by pass the kernel. If throughput is you issue, I have found you can reach 1-2 GB/s using sockets if latency isn't such an issue for you.

  • Both processes run on one Linux machine.

Making shared memory ideal.

  • In the real application about 10 calls to client's iFid.write() are made each cycle.

Not sure why this is the case. Why not build a single structure/buffer and write it once. I would use a direct buffer is NIO to minimise latency. Using character translation is pretty expensive, esp if you only need ASCII.

  • This is done on a Linux system.

Should be easy to optimise.

I use shared memory via memory mapped files. This is because I need to record every message for auditing purposes. I get an average latency of around 180 ns round trip sustained for millions of messages, and about 490 ns in a real application.

One advantage of this approach is that if there are short delays, the reader can catch up very quickly with the writer. It also support re-start and replication easily.

This is only implemented in Java, but the principle is simple enough and I am sure it would work in python as well.

https://github.com/peter-lawrey/Java-Chronicle

like image 40
Peter Lawrey Avatar answered Oct 20 '22 00:10

Peter Lawrey