Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can a org.apache.kafka.connect.data.Decimal stored in an avro file be converted to a python type?

Tags:

I am trying to interpret a Avro record stored by Debezium in Kafka, using Python

           {
              "name": "id",
              "type": {
                "type": "bytes",
                "scale": 0,
                "precision": 64,
                "connect.version": 1,
                "connect.parameters": {
                  "scale": "0"
                },
                "connect.name": "org.apache.kafka.connect.data.Decimal",
                "logicalType": "decimal"
              }
            }

I am not sure to which Python 3 primitive type this corresponds to. How can this value be deserialised?

Thanks in advance!

like image 535
Ravindranath Akila Avatar asked Oct 23 '17 08:10

Ravindranath Akila


People also ask

How do I read an Avro file in Python?

Even if you install the correct Avro package for your Python environment, the API differs between avro and avro-python3 . As an example, for Python 2 (with avro package), you need to use the function avro. schema. parse but for Python 3 (with avro-python3 package), you need to use the function avro.


2 Answers

org.apache.kafka.connect.data.Decimal is base64 encoded byte representation of unscaled integer. In order to convert this value to Decimal, you need to decode base64 string to bytes, obtain integer and then scale it by parameters.scale value.

This schema:

{
  "type": "bytes",
  "name": "org.apache.kafka.connect.data.Decimal",
  "version": 1,
  "parameters": {
    "scale": "9",
    "connect.decimal.precision": "38"
  },
  "field": "amount"
}

Can be converted with this following snippet (try it on Pyfiddle):

ctx = decimal.Context()
ctx.prec = 38  # connect.decimal.precision = 38
result = ctx.create_decimal(
    int.from_bytes(base64.b64decode("GZ6ZFQvYpA=="), byteorder='big')
) / 10 ** 9  # scale = 9
like image 63
Bedla Avatar answered Oct 11 '22 13:10

Bedla


If you look at

https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java

public static byte[] fromLogical(Schema schema, BigDecimal value) {
    if (value.scale() != scale(schema))
        throw new DataException("BigDecimal has mismatching scale value for given Decimal schema");
    return value.unscaledValue().toByteArray();
}

As you can see it uses BigDecimal and that is equivalent to Decimal in python

What is the python for Java's BigDecimal?

So you should be looking for Decimal in this case.

Part 2 - Deserialization

About the deserialization, I need feedback to update the answer. How do you do it for other fields as of now?

like image 28
Tarun Lalwani Avatar answered Oct 11 '22 14:10

Tarun Lalwani