From the AvroProducer
example in the confluent-kafka-python repo, it appears that the key/value schema are loaded from files. That is, from this code:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
value_schema = avro.load('ValueSchema.avsc')
key_schema = avro.load('KeySchema.avsc')
value = {"name": "Value"}
key = {"name": "Key"}
avroProducer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2', 'schema.registry.url': 'http://schem_registry_host:port'}, default_key_schema=key_schema, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value=value, key=key)
it appears that the files ValueSchema.avsc
and KeySchema.avsc
are loaded independently of the Avro Schema Registry.
Is this right? What's the point of referencing the URL for the Avro Schema Registry, but then loading schema from disk for key/value's?
Please clarify.
There are two types of schema in Kafka Connect, key schema and value schema. Kafka Connect sends messages to Apache Kafka containing both your value and a key. A key schema enforces a structure for keys in messages sent to Apache Kafka. A value schema enforces a structure for values in messages sent to Apache Kafka.
When Avro data is stored in a file, its schema is stored with it, so that files may be processed later by any program. If the program reading the data expects a different schema this can be easily resolved, since both schemas are present.
I ran into the same issue where it was initially unclear what the point of the local files are. As mentioned by the other answers, for the first write to an Avro topic, or an update to the topic's schema, you need the schema string - you can see this from the Kafka REST documentation here.
Once you have the schema in the registry, you can read it with REST (I used the requests Python module in this case) and use the avro.loads() method to get it. I found this useful because the produce() function requires that you have a value schema for your AvroProducer, and this code will work without that local file being present:
get_schema_req_data = requests.get("http://1.2.3.4:8081/subjects/sample_value_schema/versions/latest")
get_schema_req_data.raise_for_status()
schema_string = get_schema_req_data.json()['schema']
value_schema = avro.loads(schema_string)
avroProducer = AvroProducer({'bootstrap.servers': '1.2.3.4:9092', 'schema.registry.url': 'http://1.2.3.4:8081'}, default_value_schema=value_schema)
avroProducer.produce(topic='my_topic', value={"data" : "that matches your schema" })
Hope this helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With