I'm curious about how you are supposed to express that you want a message delivered to a Kafka topic in faust. The example in their readme doesn't seem to write to a topic:
import faust
class Greeting(faust.Record):
from_name: str
to_name: str
app = faust.App('hello-app', broker='kafka://localhost')
topic = app.topic('hello-topic', value_type=Greeting)
@app.agent(topic)
async def hello(greetings):
async for greeting in greetings:
print(f'Hello from {greeting.from_name} to {greeting.to_name}')
@app.timer(interval=1.0)
async def example_sender(app):
await hello.send(
value=Greeting(from_name='Faust', to_name='you'),
)
if __name__ == '__main__':
app.main()
I would expect hello.send
in the above code to publish a message to the topic, but it doesn't appear to.
There are many examples of reading from topics, and many examples of using the cli to push an ad-hoc message. After combing through the docs, I don't see any clear examples of publishing to topics in code. Am I just being crazy and the above code should work?
You can use sink
to tell Faust where to deliver the results of an agent function. You can also use multiple topics as sinks at once if you want.
@app.agent(topic_to_read_from, sink=[destination_topic])
async def fetch(records):
async for record in records:
result = do_something(record)
yield result
If you want a Faust producer only (not combined with a consumer/sink), the original question actually has the right bit of code, here's a fully functional script that publishes messages to a 'faust_test' Kafka topic that is consumable by any Kafka/Faust consumer.
Run the code below like this: python faust_producer.py worker
"""Simple Faust Producer"""
import faust
if __name__ == '__main__':
"""Simple Faust Producer"""
# Create the Faust App
app = faust.App('faust_test_app', broker='localhost:9092')
topic = app.topic('faust_test')
# Send messages
@app.timer(interval=1.0)
async def send_message(message):
await topic.send(value='my message')
# Start the Faust App
app.main()
The send()
function is the correct one to call to write to topics. You can even specify a particular partition, just like the equivalent Java API call.
Here is the reference for the send()
method:
https://faust.readthedocs.io/en/latest/reference/faust.topics.html#faust.topics.Topic.send
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With