Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Faust example of publishing to a kafka topic

I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic:

import faust

class Greeting(faust.Record):
    from_name: str
    to_name: str

app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)

@app.agent(topic)
async def hello(greetings):
    async for greeting in greetings:
        print(f'Hello from {greeting.from_name} to {greeting.to_name}')

@app.timer(interval=1.0)
async def example_sender(app):
    await hello.send(
        value=Greeting(from_name='Faust', to_name='you'),
    )

if __name__ == '__main__':
    app.main()

I would expect hello.send in the above code to publish a message to the topic, but it doesn't appear to.

There are many examples of reading from topics, and many examples of using the cli to push an ad-hoc message. After combing through the docs, I don't see any clear examples of publishing to topics in code. Am I just being crazy and the above code should work?

like image 789
melchoir55 Avatar asked Jun 27 '19 00:06

melchoir55


3 Answers

You can use sink to tell Faust where to deliver the results of an agent function. You can also use multiple topics as sinks at once if you want.

@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
    async for record in records:
        result = do_something(record)
        yield result
like image 50
BWStearns Avatar answered Oct 06 '22 18:10

BWStearns


If you want a Faust producer only (not combined with a consumer/sink), the original question actually has the right bit of code, here's a fully functional script that publishes messages to a 'faust_test' Kafka topic that is consumable by any Kafka/Faust consumer.

Run the code below like this: python faust_producer.py worker

"""Simple Faust Producer"""
import faust

if __name__ == '__main__':
    """Simple Faust Producer"""

    # Create the Faust App
    app = faust.App('faust_test_app', broker='localhost:9092')
    topic = app.topic('faust_test')

    # Send messages
    @app.timer(interval=1.0)
    async def send_message(message):
        await topic.send(value='my message')

    # Start the Faust App
    app.main()
like image 24
Brian Wylie Avatar answered Oct 06 '22 18:10

Brian Wylie


The send() function is the correct one to call to write to topics. You can even specify a particular partition, just like the equivalent Java API call.

Here is the reference for the send() method:

https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send

like image 40
deed02392 Avatar answered Oct 06 '22 16:10

deed02392