Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to connect kafka topic with web endpoint using Faust Python package?

I have a simple app, with two functions, one for listening to topic and other for web endpoint. I want to create server side event streaming (SSE) i.e text/event-stream, so that on client end I could listen to it using EventSource.

I have the following code for now, where each function is doing its particular job:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")


@app.agent(test_topic)
async def test_topic_agent(stream):
    async for value in stream:
        print(f"test_topic_agent RECEIVED -- {value!r}")
        yield value


@app.page("/")
async def index(self, request):
    return self.text("yey")

Now, I want in the index, something like this code, but using faust:

import asyncio
from aiohttp import web
from aiohttp.web import Response
from aiohttp_sse import sse_response
from datetime import datetime


async def hello(request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            data = 'Server Time : {}'.format(datetime.now())
            print(data)
            await resp.send(data)
            await asyncio.sleep(1, loop=loop)
    return resp


async def index(request):
    d = """
        <html>
        <body>
            <script>
                var evtSource = new EventSource("/hello");
                evtSource.onmessage = function(e) {
                    document.getElementById('response').innerText = e.data
                }
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return Response(text=d, content_type='text/html')


app = web.Application()
app.router.add_route('GET', '/hello', hello)
app.router.add_route('GET', '/', index)
web.run_app(app, host='127.0.0.1', port=8080)

I have tried this:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")


# @app.agent(test_topic)
# async def test_topic_agent(stream):
#     async for value in stream:
#         print(f"test_topic_agent RECEIVED -- {value!r}")
#         yield value


@app.page("/", name="t1")
@app.agent(test_topic, name="t")
async def index(self, request):
    return self.text("yey")

But it gives me the following error:

Traceback (most recent call last):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 299, in find_app
    val = symbol_by_name(app, imp=imp)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 262, in symbol_by_name
    module = imp(  # type: ignore
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
    return imp(module, package=package)
  File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 783, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
    async def index(self, request):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
    view = view_base.from_handler(cast(ViewHandlerFun, fun))
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
    return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/maverick/.pyenv/versions/faust_demo/bin/faust", line 8, in <module>
    sys.exit(cli())
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/click/core.py", line 781, in main
    with self.make_context(prog_name, args, **extra) as ctx:
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 407, in make_context
    self._maybe_import_app()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 372, in _maybe_import_app
    find_app(appstr)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/cli/base.py", line 303, in find_app
    val = imp(app)
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/utils/imports.py", line 376, in import_from_cwd
    return imp(module, package=package)
  File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/importlib/__init__.py", line 127, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 1014, in _gcd_import
  File "<frozen importlib._bootstrap>", line 991, in _find_and_load
  File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 783, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/Users/maverick/company/demo1/baiohttp-demo/app1.py", line 18, in <module>
    async def index(self, request):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/app/base.py", line 1231, in _decorator
    view = view_base.from_handler(cast(ViewHandlerFun, fun))
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/web/views.py", line 50, in from_handler
    return type(fun.__name__, (cls,), {
AttributeError: 'Agent' object has no attribute '__name__'

I event tried this:

import faust

from faust.web import Response

app = faust.App("app1", broker="kafka://localhost:29092", value_serializer="raw")
test_topic = app.topic("test")


# @app.agent(test_topic)
# async def test_topic_agent(stream):
#     async for value in stream:
#         print(f"test_topic_agent RECEIVED -- {value!r}")
#         yield value


@app.agent(test_topic, name="t")
@app.page("/", name="t1")
async def index(self, request):
    return self.text("yey")

But I get following error:

[2020-03-28 10:32:50,676] [29976] [INFO] [^--Producer]: Creating topic 'app1-__assignor-__leader'
[2020-03-28 10:32:50,695] [29976] [INFO] [^--ReplyConsumer]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^--AgentManager]: Starting...
[2020-03-28 10:32:50,695] [29976] [INFO] [^---Agent: app1.index]: Starting...
[2020-03-28 10:32:50,696] [29976] [ERROR] [^Worker]: Error: TypeError("__init__() missing 1 required positional argument: 'web'")
Traceback (most recent call last):
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/worker.py", line 273, in execute_from_commandline
    self.loop.run_until_complete(self._starting_fut)
  File "/Users/maverick/.pyenv/versions/3.8.1/lib/python3.8/asyncio/base_events.py", line 612, in run_until_complete
    return future.result()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 767, in _actually_start
    await child.maybe_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/manager.py", line 58, in on_start
    await agent.maybe_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 795, in maybe_start
    await self.start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 736, in start
    await self._default_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 743, in _default_start
    await self._actually_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/mode/services.py", line 760, in _actually_start
    await self.on_start()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 282, in on_start
    await self._on_start_supervisor()
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 312, in _on_start_supervisor
    res = await self._start_one(
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 251, in _start_one
    return await self._start_task(
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 617, in _start_task
    actor = self(
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 525, in __call__
    return self.actor_from_stream(stream,
  File "/Users/maverick/.pyenv/versions/3.8.1/envs/faust_demo/lib/python3.8/site-packages/faust/agents/agent.py", line 552, in actor_from_stream
    res = self.fun(actual_stream)
TypeError: __init__() missing 1 required positional argument: 'web'
[2020-03-28 10:32:50,703] [29976] [INFO] [^Worker]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Stopping...
[2020-03-28 10:32:50,703] [29976] [INFO] [^-App]: Flush producer buffer...
[2020-03-28 10:32:50,703] [29976] [INFO] [^--TableManager]: Stopping...

Could there be a way for this? Thanks a lot in advance!

like image 565
Maverick Avatar asked Mar 28 '20 09:03

Maverick


3 Answers

I was able to get the Faust Service, Agent and Web endpoints interact. There are three parts to this, A Service class that is handling the lifecycle of the application, a Kafka and Portal end point that forward messages to the business class and the Business class that does some functions and passes results back via the service class onto kafka

Note: this is pseudocode you will need to fill in some basic stuff

import MyBusinessClass

# This is my class handling the business functions, events are routed to it
# from kafka or the web portal and it takes an action by sending a message
# back on kafka via the Service class
actor = MyBusinessClass()

# Kafka topic to receive messages and hand over to business class
topic = app.topic('mytopic', value_serializer='raw')

@app.service
class MyService:
    async def on_start(self):
       # Give a reference of your self to Business class
       actor.setService(self) 
       ... do startup stuff

    async def on_stop(self):
       ... do shutdown stuff

    # Kafka consumption end point
    @app.agent(topic)
    async def process_kafka(stream):
        async for payload in stream:
            obj = from_dict(data_class=MsgPojo, data=loads(payload))
            # hand over the message to the business class
            await actor.onKafkaMsg(obj) 


    # Web portal endpoint
    @app.page('/main')
    async def portal_main(self, request):
         ... do something
         # Send portal actions over the internal channel to 
         await actor.onPortalMsg(request)

     # This function is called by the Business class to send messages over
     # kafka, in response to an action on the web portal or a message from 
     # kafka
     async def sendOrder(self, order)
         await topic.send(value=order)



class MyBusinessClass:

    # gets a handle to the service class for network communication
    def setService(self, service:MyService):
        self.service = service
   
    def onKafkaMsg(self, obj)
        ... do something with obj
        await self.service.sendOrder(result)

    def onPortalMsg(self, portalreq)
        ... do something with portalreq
        await self.service.sendOrder(result)


    
like image 95
Inderdeep Singh Avatar answered Oct 26 '22 13:10

Inderdeep Singh


The Faust worker will also expose a web server on every instance, that by default runs on port 6066.

The server will use the aiohttp HTTP server library and you can take advantage of this thing and create a server-side event streaming (SSE) like in your example code.

You can create an agent that will read from Kafka topic test and will update a variable last_message_from_topic with the last message from the topic, this variable will be visible also from your web pages.

In the index page (@app.page('/')) the EventSource interface is used to receive server-sent events. It connects to the server over HTTP and receives events in text/event-stream format from the page /hello without closing the connection.

The web page /hello at every second is sending a message text with the last message from the Kafka topic test and with the current time from the server.

here is my file my_worker.py code:

import asyncio
from datetime import datetime

import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response

app = faust.App(
    "app1",
    broker='kafka://localhost:9092',
    value_serializer='json',
)
test_topic = app.topic("test")

last_message_from_topic = ['No messages yet']


@app.agent(test_topic)
async def greet(greetings):
    async for greeting in greetings:
        last_message_from_topic[0] = greeting


@app.page('/hello')
async def hello(self, request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            data = f'last message from topic_test: {last_message_from_topic[0]} | '
            data += f'Server Time : {datetime.now()}'

            print(data)
            await resp.send(data)
            await asyncio.sleep(1, loop=loop)
    return resp


@app.page('/')
async def index(self, request):
    d = """
        <html>
        <body>
            <script>
                var evtSource = new EventSource("/hello");
                evtSource.onmessage = function(e) {
                    document.getElementById('response').innerText = e.data
                }
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return Response(text=d, content_type='text/html')

now you have to start the Faust worker with the following command:

faust -A my_worker worker -l info

on your web browser you can access http://localhost:6066/:

enter image description here


here is the code to send messages to Kafka on the topic test (from another python file):

import time
import json

from kafka import  KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))


for i in range(220):
    time.sleep(1)
    producer.send('test', value=f'Some message from kafka id {i}')
like image 31
kederrac Avatar answered Oct 26 '22 12:10

kederrac


Reading faust web documentation it doesnot seems to handle SSE.

@app.agent is callbacked when a kafka message is consumed and @app.page when an http request is processed. Combining them is probably not possible.

A alternative approach using faust.web is to poll from the javascript.
For instance using :

import faust
from faust.web import Response

app = faust.App("myapp", broker="kafka://kafka:9092", value_serializer="raw")
test_topic = app.topic("test")
app.lastmsg = ""

@app.agent(test_topic)
async def test_topic_agent(stream):
    async for value in stream:
        app.lastmsg = str(value)
        yield value

@app.page("/msg")
async def msg(self, request):
    return self.text(app.lastmsg)

@app.page("/")
async def index(self, request):
    body = """
        <html>
        <body>
            <script>
                setInterval(()=>{
                    let xhr = new XMLHttpRequest();
                    xhr.open('GET', '/msg');
                    xhr.send();
                    xhr.onload = function() {
                        if (xhr.status == 200) {
                            document.getElementById('response').innerText = xhr.response
                        }
                    }
                },1000);
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return self.html(body)

This naive implementation store the kafka message to app.lastmsg that could be get by the api /msg.

In order to use SSE you can use asyncio for the web part and faust for the kafka consumer.

like image 38
mpromonet Avatar answered Oct 26 '22 12:10

mpromonet