From the Faust documentation I can't find out how to set the consumer to a specific offset.
With confluent-kafka I use consumer.offsets_for_times to find a start_offset and then assign the TopicPartition to that specific offset, something like:
start_offset = consumer.offsets_for_times([
    TopicPartition("prediction.OfferPredictionCheckpoint", 0, int(start_date)),
    TopicPartition("prediction.OfferPredictionCheckpoint", 1, int(start_date)),
])
consumer.assign([
    TopicPartition("prediction.OfferPredictionCheckpoint", partition_number, pos)
])
With Faust I can't find much more than:
consumer_auto_offset_reset
Which only let you set earliest or latest. How would I start reading from a specific hour or beginning of day?
to set the offset to a specific value you can use these example. Here I set the offset to 50000. Every time I start my app, the agent starts reading at the offset 50000. For this I use app.consumer.seek
Here tp takes in two params, topic - test in this case and 0 which is the partition number. For more info faust.types
from faust.types import TP, Message
tp = TP("test", 0)
topic = app.topic(tp.topic)
@app.task()
async def on_start():
    await app.consumer.seek(tp, 50000)
    print("App startet")
@app.agent(topic)
async def receive(stream):
    async for event in stream.events():
        print((event.message.offset, event.value))
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With