Does anybody have any experience/pointers on testing a Flask Content Streaming resource? My application uses Redis Pub/Sub, when receiving a message in a channel it streams the text 'data: {"value":42}
'
The implementation is following Flask docs at: docs
and my unit tests are done following Flask docs too. The messages to Redis pub/sub are sent by a second resource (POST).
I'm creating a thread to listen to the stream while I POST on the main application to the resource that publishes to Redis.
Although the connection assertions pass (I receive OK 200 and a mimetype 'text/event-stream') the data object is empty.
My unit test is like this:
def test_04_receiving_events(self):
headers = [('Content-Type', 'application/json')]
data = json.dumps({"value": 42})
headers.append(('Content-Length', len(data)))
def get_stream():
rv_stream = self.app.get('stream/data')
rv_stream_object = json.loads(rv_stream.data) #error (empty)
self.assertEqual(rv_stream.status_code, 200)
self.assertEqual(rv_stream.mimetype, 'text/event-stream')
self.assertEqual(rv_stream_object, "data: {'value': 42}")
t.stop()
threads = []
t = Thread(target=get_stream)
threads.append(t)
t.start()
time.sleep(1)
rv_post = self.app.post('/sensor', headers=headers, data=data)
threads_done = False
while not threads_done:
threads_done = True
for t in threads:
if t.is_alive():
threads_done = False
time.sleep(1)
The app resource is:
@app.route('/stream/data')
def stream():
def generate():
pubsub = db.pubsub()
pubsub.subscribe('interesting')
for event in pubsub.listen():
if event['type'] == 'message':
yield 'data: {"value":%s}\n\n' % event['data']
return Response(stream_with_context(generate()),
direct_passthrough=True,
mimetype='text/event-stream')
Any pointers or examples of how to test a Content Stream in Flask? Google seems to not help much on this one, unless I'm searching the wrong keywords.
Thanks in advance.
You are starting the thread to get_stream()
before POSTing any data to REDIS.
This means that it won't find any events and will return without streaming any data.
I believe you don't needs threads at all, you can simply use the POST to setup data for your integration test and then call the stream.
def test_04_receiving_events(self):
headers = [('Content-Type', 'application/json')]
data = json.dumps({"value": 42})
headers.append(('Content-Length', len(data)))
rv_post = self.app.post('/sensor', headers=headers, data=data)
rv_stream = self.app.get('stream/data')
rv_stream_object = json.loads(rv_stream.data)
self.assertEqual(rv_stream.status_code, 200)
self.assertEqual(rv_stream.mimetype, 'text/event-stream')
self.assertEqual(rv_stream_object, "data: {'value': 42}")
If you are interested rather in integration testing than unit testing, my suggestion would be to use the Postman App and its command line integration with newman.
You can have all variables and parameters in an environment and all requests to be tested in a collection. Postman offers docs and tutorials for this, see https://learning.postman.com/. You can do a lot with the free version.
Set Authorization, Params, Headers to match the incoming requests from the service that sends requests to your instance to be tested. When you are done with preparing your requests to successfully run in postman, then you write tests in postman where you might assert specific parts of a requests response body. Postman also offers pre-build snippets to write these.
pm.test("Body includes 'some string'", function () {
pm.expect(pm.response.text()).to.include("some string");
});
You can also export your collection of requests (includes tests) and environment as json and run it in a shell with newman.
$ newman run mycollection.json -e dev_environment.json
This is useful if you want to integrate the tests into a CICD pipeline.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With