I'm trying to create a twisted daemon(the server part) that communicates with my local code base (the client part). Basically, the client is supposed to callRemote() using AMP to the daemon to start some processing (updating the database) some methods. After each method is finished processing on the server, I need the server to callRemote() to my client so that the user knows the progress of the server.
I've been able to call to the server from the client and get a response but I can't get the server to send a response to the client.
I've googled for a solution but I can't find any example code that uses AMP for bi-directional communication--it's always the client calling to the server.
I'm trying to get the client to call to the server to start processing (the ServerStart AMP command) and then have the server send multiple calls back to the client to provide updates on processing (the MessageClient AMP command).
Any help would be GREATLY appreciated. A super simple example that shows how to call to the server from the client and then have the server pass back two calls to the client would be awesome!
ampclient.py
from client_server import MessageServer, Client, ServerStart
from twisted.internet.protocol import ClientCreator
from twisted.internet import reactor
from twisted.protocols import amp
from time import sleep
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService
def startServerProcess():
def show_start(result):
print 'result from server: %r' % result
d = ClientCreator(reactor, amp.AMP).connectTCP(
'127.0.0.1', 1234).addCallback(
lambda p: p.callRemote(ServerStart, truncate=True)).addCallback(
show_start)
pf = Factory()
pf.protocol = Client
reactor.listenTCP(1235, pf)
print 'client listening'
startServerProcess()
sleep(4)
reactor.run()
ampserver.py
from client_server import MessageClient, Server
from twisted.internet.protocol import ClientCreator
from twisted.internet import reactor
from twisted.protocols import amp
from time import sleep
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService
def makeClientCall():
def show_result(result):
print 'result from client: %r' % result
d = ClientCreator(reactor, amp.AMP).connectTCP(
'127.0.0.1', 1235).addCallback(
lambda p: p.callRemote(MessageClient)).addCallback(
show_result)
application = Application("server app")
endpoint = TCP4ServerEndpoint(reactor, 1234)
factory = Factory()
factory.protocol = Server
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)
sleep(4)
makeClientCall()
makeClientCall()
client_server.py
from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService
class MessageServer(amp.Command):
response = [('msg', amp.String())]
class ServerStart(amp.Command):
arguments = [('truncate', amp.Boolean())]
response = [('msg', amp.String())]
class Server(amp.AMP):
def message_it(self):
msg = 'This is a message from the server'
print 'msg sent to client: %s' % msg
return {'msg': msg}
MessageServer.responder(message_it)
def start_it(self, truncate):
msg = 'Starting processing...'
return {'msg': msg}
ServerStart.responder(start_it)
class MessageClient(amp.Command):
response = [('msg', amp.String())]
class Client(amp.AMP):
def message_it(self):
msg = 'This is a message from the client'
return {'msg': msg}
MessageClient.responder(message_it)
Here's a simple example of a bi-directional AMP client and server. The key is that the AMP protocol class holds a reference to the client connection and provides a callRemote
method.
Of course, I only know this from digging through the AMP code. Twisted documentation is lacking at best, at least outside the core.
File: count_server.tac
from twisted.protocols.amp import AMP
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.application.service import Application
from twisted.application.internet import StreamServerEndpointService
from count_client import Counter
application = Application('test AMP server')
endpoint = TCP4ServerEndpoint(reactor, 8750)
factory = Factory()
factory.protocol = Counter
service = StreamServerEndpointService(endpoint, factory)
service.setServiceParent(application)
File: count_client.py
if __name__ == '__main__':
import count_client
raise SystemExit(count_client.main())
from sys import stdout
from twisted.python.log import startLogging, err
from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ClientEndpoint
class Count(amp.Command):
arguments = [('n', amp.Integer())]
response = [('ok', amp.Boolean())]
class Counter(amp.AMP):
@Count.responder
def count(self, n):
print 'received:', n
n += 1
if n < 10:
print 'sending:', n
self.callRemote(Count, n=n)
return {'ok': True}
def connect():
endpoint = TCP4ClientEndpoint(reactor, '127.0.0.1', 8750)
factory = Factory()
factory.protocol = Counter
return endpoint.connect(factory)
def main():
startLogging(stdout)
d = connect()
d.addErrback(err, 'connection failed')
d.addCallback(lambda p: p.callRemote(Count, n=1))
d.addErrback(err, 'call failed')
reactor.run()
Server output:
$ twistd -n -y count_server.tac
2013-03-27 11:05:18-0500 [-] Log opened.
2013-03-27 11:05:18-0500 [-] twistd 12.2.0 (/usr/bin/python 2.7.3) starting up.
2013-03-27 11:05:18-0500 [-] reactor class: twisted.internet.epollreactor.EPollReactor.
2013-03-27 11:05:18-0500 [-] Factory starting on 8750
2013-03-27 11:05:18-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x2adc368>
2013-03-27 11:05:22-0500 [twisted.internet.protocol.Factory] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 1
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 2
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 3
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 4
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 5
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 6
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 7
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] sending: 8
2013-03-27 11:05:22-0500 [Counter,0,127.0.0.1] received: 9
2013-03-27 11:05:26-0500 [Counter,0,127.0.0.1] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 8750) PEER:IPv4Address(TCP, '127.0.0.1', 58195))
^C2013-03-27 11:05:31-0500 [-] Received SIGINT, shutting down.
2013-03-27 11:05:31-0500 [-] (TCP Port 8750 Closed)
2013-03-27 11:05:31-0500 [-] Stopping factory <twisted.internet.protocol.Factory instance at 0x2adc368>
2013-03-27 11:05:31-0500 [-] Main loop terminated.
2013-03-27 11:05:31-0500 [-] Server Shut Down.
Client output:
$ python count_client.py
2013-03-27 11:05:22-0500 [-] Log opened.
2013-03-27 11:05:22-0500 [-] Starting factory <twisted.internet.protocol.Factory instance at 0x246bf80>
2013-03-27 11:05:22-0500 [Uninitialized] Counter connection established (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750))
2013-03-27 11:05:22-0500 [Counter,client] received: 2
2013-03-27 11:05:22-0500 [Counter,client] sending: 3
2013-03-27 11:05:22-0500 [Counter,client] received: 4
2013-03-27 11:05:22-0500 [Counter,client] sending: 5
2013-03-27 11:05:22-0500 [Counter,client] received: 6
2013-03-27 11:05:22-0500 [Counter,client] sending: 7
2013-03-27 11:05:22-0500 [Counter,client] received: 8
2013-03-27 11:05:22-0500 [Counter,client] sending: 9
^C2013-03-27 11:05:26-0500 [-] Received SIGINT, shutting down.
2013-03-27 11:05:26-0500 [Counter,client] Counter connection lost (HOST:IPv4Address(TCP, '127.0.0.1', 58195) PEER:IPv4Address(TCP, '127.0.0.1', 8750))
2013-03-27 11:05:26-0500 [Counter,client] Stopping factory <twisted.internet.protocol.Factory instance at 0x246bf80>
2013-03-27 11:05:26-0500 [-] Main loop terminated.
The previous response by Ryan P leaves something to be desired. In particular, it never actually used the AMP response, preferring instead to chain callRemote calls everywhere. Here's my answer, based on the ampserver.py (unchanged) and ampclient.py (rewritten) examples in twisted. This answers the underlying question on bi-directional messaging in a way which fits the question (though not exactly the description).
Short summary, add a callback to the deferred which you get from callRemote, which will have the response(s) in its argument. It's a normal dictionary so you can do what you want with it.
File: ampserver.py
from twisted.protocols import amp
class Sum(amp.Command):
arguments = [('a', amp.Integer()),
('b', amp.Integer())]
response = [('total', amp.Integer())]
class Divide(amp.Command):
arguments = [('numerator', amp.Integer()),
('denominator', amp.Integer())]
response = [('result', amp.Float())]
errors = {ZeroDivisionError: 'ZERO_DIVISION'}
class Math(amp.AMP):
def sum(self, a, b):
total = a + b
print 'Did a sum: %d + %d = %d' % (a, b, total)
return {'total': total}
Sum.responder(sum)
def divide(self, numerator, denominator):
result = float(numerator) / denominator
print 'Divided: %d / %d = %f' % (numerator, denominator, result)
return {'result': result}
Divide.responder(divide)
def main():
from twisted.internet import reactor
from twisted.internet.protocol import Factory
pf = Factory()
pf.protocol = Math
reactor.listenTCP(1234, pf)
print 'started'
reactor.run()
if __name__ == '__main__':
main()
File: ampclient.py
from twisted.internet import reactor, protocol
from twisted.internet.task import deferLater
from twisted.protocols import amp
from ampserver import Sum, Divide
connection = None
class MathClient(amp.AMP):
def connectionMade(self):
global connection
connection = self
class MathFactory(protocol.ReconnectingClientFactory):
protocol = MathClient
if __name__ == '__main__':
reactor.connectTCP('127.0.0.1', 1234, MathFactory())
def simpleSum():
global connection
d = connection.callRemote(Sum, a=1, b=5)
def prin(result):
print(result)
d.addCallback(prin)
return d
deferLater(reactor, 1, simpleSum)
deferLater(reactor, 3, simpleSum)
deferLater(reactor, 6, simpleSum)
deferLater(reactor, 9, simpleSum)
deferLater(reactor, 12, simpleSum)
deferLater(reactor, 15, simpleSum)
deferLater(reactor, 18, simpleSum).addCallback(lambda _: reactor.stop())
reactor.run()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With