Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Write a csv file to a kafka topic

I have a large csv and I want to write to a kafka topic.

def producer():
    producer = KafkaProducer(bootstrap_servers='mykafka-broker')
    with open('/home/antonis/repos/testfile.csv') as file:
        reader = csv.DictReader(file, delimiter=";")
        for row in reader:
            producer.send(topic='stable_topic', value=row)
            producer.flush()

if __name__ == '__main__':
    producer()

This code produces an error:

AssertionError: value must be bytes

The file looks like:

"timestamp","name","age"
2020-03-01 00:00:01,John,36
2020-03-01 00:00:01,Peter,22

Can anyone help me with this?

like image 269
e7lT2P Avatar asked Sep 21 '25 05:09

e7lT2P


1 Answers

You need to properly serialise your values.


The following should do the trick:

import json  

producer = KafkaProducer(
    bootstrap_servers='mykafka-broker',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
like image 119
Giorgos Myrianthous Avatar answered Sep 22 '25 20:09

Giorgos Myrianthous