Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use inproc transport with pyzmq?

I have set up two small scripts imitating a publish and subscribe procedure with pyzmq. However, I am unable to send messages over to my subscriber client using the inproc transport. I am able to use tcp://127.0.0.1:8080 fine, just not inproc.

pub_server.py

import zmq
import random
import sys
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("inproc://stream")

while True:
    socket.send_string("Hello")
    time.sleep(1)

sub_client.py

import sys
import zmq

# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.setsockopt_string(zmq.SUBSCRIBE, '')
socket.connect("inproc://stream")

for x in range (5):
    string = socket.recv()
    print(string)

How can I successfully alter my code so that I'm able to use the inproc transport method between my two scripts?

EDIT:

I have updated my code to further reflect @larsks comment. I am still not receiving my published string - what is it that I am doing wrong?

import threading
import zmq

def pub():
    context = zmq.Context()
    sender = context.socket(zmq.PUB)
    sender.connect("inproc://hello")
    lock = threading.RLock()

    with lock:
        sender.send(b"")

def sub():
    context = zmq.Context()
    receiver = context.socket(zmq.SUB)
    receiver.bind("inproc://hello")

    pub()

    # Wait for signal
    string = receiver.recv()
    print(string)
    print("Test successful!")

    receiver.close()

if __name__ == "__main__":
    sub()
like image 935
juiceb0xk Avatar asked Sep 16 '25 01:09

juiceb0xk


1 Answers

As the name implies, inproc sockets can only be used within the same process. If you were to rewrite your client and server such that there were two threads in the same process you could use inproc, but otherwise this socket type simply isn't suitable for what you're doing.

The documentation is very clear on this point:

The in-process transport passes messages via memory directly between threads sharing a single ØMQ context.

Update

Taking a look at the updated code, the problem that stands out first is that while the documentation quoted above says "...between threads sharing a single ØMQ context", you are creating two contexts in your code. Typically, you will only call zmq.Context() once in your program.

Next, you are never subscribing your subscriber to any messages, so even in the event that everything else was working correctly you would not actually receive any messages.

Lastly, your code is going to experience the slow joiner problem:

There is one more important thing to know about PUB-SUB sockets: you do not know precisely when a subscriber starts to get messages. Even if you start a subscriber, wait a while, and then start the publisher, the subscriber will always miss the first messages that the publisher sends. This is because as the subscriber connects to the publisher (something that takes a small but non-zero time), the publisher may already be sending messages out.

The pub/sub model isn't meant for single messages, nor is it meant to be a reliable transport.

So, to sum up:

  • You need to create a shared ZMQ context before you creating your sockets.
  • You probably want your publisher to publish in a loop instead of publishing a single message. Since you're trying to use inproc sockets you're going to need to put your two functions into separate threads.
  • You need to set a subscription filter in order to receive messages.

There is an example using PAIR sockets in the ZMQ documentation that might provide a useful starting point. PAIR sockets are designed for coordinating threads over inproc sockets, and unlike pub/sub sockets they are bidirectional and are not impacted by the "slow joiner" issue.

like image 125
larsks Avatar answered Sep 19 '25 03:09

larsks