Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create bi-directional messaging using AMP in Twisted/Python

I'm trying to create a twisted daemon(the server part) that communicates with my local code base (the client part). Basically, the client is supposed to callRemote() using AMP to the daemon to start some processing (updating the database) some methods. After each method is finished processing on the server, I need the server to callRemote() to my client so that the user knows the progress of the server.

I've been able to call to the server from the client and get a response but I can't get the server to send a response to the client.

I've googled for a solution but I can't find any example code that uses AMP for bi-directional communication--it's always the client calling to the server.

I'm trying to get the client to call to the server to start processing (the ServerStart AMP command) and then have the server send multiple calls back to the client to provide updates on processing (the MessageClient AMP command).

Any help would be GREATLY appreciated. A super simple example that shows how to call to the server from the client and then have the server pass back two calls to the client would be awesome!

ampclient.py

from client_server import MessageServer, Client, ServerStart
from twisted.internet.protocol import ClientCreator
from twisted.internet import reactor
from twisted.protocols import amp
from time import sleep
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

def startServerProcess():
    def show_start(result):
        print 'result from server: %r' % result

    d = ClientCreator(reactor, amp.AMP).connectTCP(
        '127.0.0.1', 1234).addCallback(
            lambda p: p.callRemote(ServerStart, truncate=True)).addCallback(
                show_start)    

pf = Factory()
pf.protocol = Client
reactor.listenTCP(1235, pf)
print 'client listening'

startServerProcess()

sleep(4)

reactor.run()

ampserver.py

from client_server import MessageClient, Server
from twisted.internet.protocol import ClientCreator
from twisted.internet import reactor
from twisted.protocols import amp
from time import sleep
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

def makeClientCall():
    def show_result(result):
        print 'result from client: %r' % result     

    d = ClientCreator(reactor, amp.AMP).connectTCP(
        '127.0.0.1', 1235).addCallback(
            lambda p: p.callRemote(MessageClient)).addCallback(
                show_result)


application = Application("server app")

endpoint = TCP4ServerEndpoint(reactor, 1234)
factory = Factory()
factory.protocol = Server
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)

sleep(4)

makeClientCall()
makeClientCall()

client_server.py

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

class MessageServer(amp.Command):
    response = [('msg', amp.String())]

class ServerStart(amp.Command):
    arguments = [('truncate', amp.Boolean())]
    response = [('msg', amp.String())]

class Server(amp.AMP):
    def message_it(self):
        msg = 'This is a message from the server'
        print 'msg sent to client: %s' % msg
        return {'msg': msg}
    MessageServer.responder(message_it)

    def start_it(self, truncate):
        msg = 'Starting processing...'
        return {'msg': msg}
    ServerStart.responder(start_it)



class MessageClient(amp.Command):
    response = [('msg', amp.String())]

class Client(amp.AMP):
    def message_it(self):
        msg = 'This is a message from the client'
        return {'msg': msg}
    MessageClient.responder(message_it)
like image 703
Scott Avatar asked Mar 26 '13 15:03

Scott


2 Answers

Here's a simple example of a bi-directional AMP client and server. The key is that the AMP protocol class holds a reference to the client connection and provides a callRemote method.

Of course, I only know this from digging through the AMP code. Twisted documentation is lacking at best, at least outside the core.

File: count_server.tac

from twisted.protocols.amp import AMP
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService

from count_client import Counter

application = Application('test AMP server')

endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = Counter
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)

File: count_client.py

if __name__ == '__main__':
    import count_client
    raise SystemExit(count_client.main())

from sys import stdout

from twisted.python.log import startLogging, err
from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ClientEndpoint

class Count(amp.Command):
    arguments = [('n', amp.Integer())]
    response = [('ok', amp.Boolean())]

class Counter(amp.AMP):
    @Count.responder
    def count(self, n):
        print 'received:', n
        n += 1

        if n < 10:
            print 'sending:', n
            self.callRemote(Count, n=n)

        return {'ok': True}

def connect():
    endpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', 8750)
    factory = Factory()
    factory.protocol = Counter
    return endpoint.connect(factory)

def main():
    startLogging(stdout)

    d = connect()
    d.addErrback(err, 'connection failed')
    d.addCallback(lambda p: p.callRemote(Count, n=1))
    d.addErrback(err, 'call failed')

    reactor.run()

Server output:

$ twistd -n -y count_server.tac
2013-03-27 11:05:18-0500 [-] Log opened.
2013-03-27 11:05:18-0500 [-] twistd 12.2.0 (/usr/bin/python 2.7.3) starting up.
2013-03-27 11:05:18-0500 [-] reactor class: twisted.internet.epollreactor.EPollReactor.
2013-03-27 11:05:18-0500 [-] Factory starting on 8750
2013-03-27 11:05:18-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x2adc368>
2013-03-27 11:05:22-0500 [twisted.internet.protocol.Factory] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 1
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 2
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 3
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 4
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 5
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 6
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 7
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 8
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 9
2013-03-27 11:05:26-0500 [Counter,0,127.0.0.1] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
^C2013-03-27 11:05:31-0500 [-] Received SIGINT, shutting down.
2013-03-27 11:05:31-0500 [-] (TCP Port 8750 Closed)
2013-03-27 11:05:31-0500 [-] Stopping factory <twisted.internet.protocol.Factory instance at 0x2adc368>
2013-03-27 11:05:31-0500 [-] Main loop terminated.
2013-03-27 11:05:31-0500 [-] Server Shut Down.

Client output:

$ python count_client.py
2013-03-27 11:05:22-0500 [-] Log opened.
2013-03-27 11:05:22-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x246bf80>
2013-03-27 11:05:22-0500 [Uninitialized] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750))
2013-03-27 11:05:22-0500 [Counter,client] received: 2
2013-03-27 11:05:22-0500 [Counter,client] sending: 3
2013-03-27 11:05:22-0500 [Counter,client] received: 4
2013-03-27 11:05:22-0500 [Counter,client] sending: 5
2013-03-27 11:05:22-0500 [Counter,client] received: 6
2013-03-27 11:05:22-0500 [Counter,client] sending: 7
2013-03-27 11:05:22-0500 [Counter,client] received: 8
2013-03-27 11:05:22-0500 [Counter,client] sending: 9
^C2013-03-27 11:05:26-0500 [-] Received SIGINT, shutting down.
2013-03-27 11:05:26-0500 [Counter,client] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750))
2013-03-27 11:05:26-0500 [Counter,client] Stopping factory <twisted.internet.protocol.Factory instance at 0x246bf80>
2013-03-27 11:05:26-0500 [-] Main loop terminated.
like image 164
kitti Avatar answered Nov 13 '22 16:11

kitti


The previous response by Ryan P leaves something to be desired. In particular, it never actually used the AMP response, preferring instead to chain callRemote calls everywhere. Here's my answer, based on the ampserver.py (unchanged) and ampclient.py (rewritten) examples in twisted. This answers the underlying question on bi-directional messaging in a way which fits the question (though not exactly the description).

Short summary, add a callback to the deferred which you get from callRemote, which will have the response(s) in its argument. It's a normal dictionary so you can do what you want with it.

File: ampserver.py

from twisted.protocols import amp

class Sum(amp.Command):
    arguments = [('a', amp.Integer()),
                 ('b', amp.Integer())]
    response = [('total', amp.Integer())]


class Divide(amp.Command):
    arguments = [('numerator', amp.Integer()),
                 ('denominator', amp.Integer())]
    response = [('result', amp.Float())]
    errors = {ZeroDivisionError: 'ZERO_DIVISION'}


class Math(amp.AMP):
    def sum(self, a, b):
        total = a + b
        print 'Did a sum: %d + %d = %d' % (a, b, total)
        return {'total': total}
    Sum.responder(sum)

    def divide(self, numerator, denominator):
        result = float(numerator) / denominator
        print 'Divided: %d / %d = %f' % (numerator, denominator, result)
        return {'result': result}
    Divide.responder(divide)


def main():
    from twisted.internet import reactor
    from twisted.internet.protocol import Factory
    pf = Factory()
    pf.protocol = Math
    reactor.listenTCP(1234, pf)
    print 'started'
    reactor.run()

if __name__ == '__main__':
    main()

File: ampclient.py

from twisted.internet import reactor, protocol
from twisted.internet.task import deferLater
from twisted.protocols import amp
from ampserver import Sum, Divide


connection = None

class MathClient(amp.AMP):
    def connectionMade(self):
        global connection
        connection = self


class MathFactory(protocol.ReconnectingClientFactory):
    protocol = MathClient


if __name__ == '__main__':
    reactor.connectTCP('127.0.0.1', 1234, MathFactory())
    def simpleSum():
        global connection
        d = connection.callRemote(Sum, a=1, b=5)
        def prin(result):
            print(result)
        d.addCallback(prin)
        return d
    deferLater(reactor, 1, simpleSum)
    deferLater(reactor, 3, simpleSum)
    deferLater(reactor, 6, simpleSum)
    deferLater(reactor, 9, simpleSum)
    deferLater(reactor, 12, simpleSum)
    deferLater(reactor, 15, simpleSum)
    deferLater(reactor, 18, simpleSum).addCallback(lambda _: reactor.stop())
    reactor.run()
like image 37
Ng Oon-Ee Avatar answered Nov 13 '22 15:11

Ng Oon-Ee