Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

from_json of Spark sql return null values

I loaded a parquet file into a spark dataframe as follows :

val message= spark.read.parquet("gs://defenault-zdtt-devde/pubsub/part-00001-e9f8c58f-7de0-4537-a7be-a9a8556sede04a-c000.snappy.parquet")

when I perform a collect on my dataframe I get the following result :

message.collect()

Array[org.apache.spark.sql.Row] = Array([118738748835150,2018-08-20T17:44:38.742Z,{"id":"uplink-3130-85bc","device_id":60517119992794222,"group_id":69,"group":"box-2478-2555","profile_id":3,"profile":"eolane-movee","type":"uplink","timestamp":"2018-08-20T17:44:37.048Z","count":3130,"payload":[{"timestamp":"2018-08-20T17:44:37.048Z","data":{"battery":3.5975599999999996,"temperature":27}}],"payload_encrypted":"9da25e36","payload_cleartext":"fe1b01aa","device_properties":{"appeui":"7ca97df000001190","deveui":"7ca97d0000001bb0","external_id":"Product: 3.7 / HW: 3.1 / SW: 1.8.8","no_de_serie_eolane":"4904","no_emballage":"S02066","product_version":"1.3.1"},"protocol_data":{"AppNonce":"e820ef","DevAddr":"0e6c5fda","DevNonce":"85bc","NetID":"000007","best_gateway_id":"M40246","gateway.

The schema of this dataframe is

message.printSchema()
root


 |-- Id: string (nullable = true)
 |-- publishTime: string (nullable = true)
 |-- data: string (nullable = true)

My aim is to work on the data column which holds json data and to flatten it. I wrote the following code

val schemaTotal = new StructType (
Array (StructField("id",StringType,false),StructField("device_id",StringType),StructField("group_id",LongType), StructField("group",StringType),StructField("profile_id",IntegerType),StructField("profile",StringType),StructField("type",StringType),StructField("timestamp",StringType),
StructField("count",StringType),
StructField("payload",new StructType ()
.add("timestamp",StringType)
.add("data",new ArrayType (new StructType().add("battery",LongType).add("temperature",LongType),false))),
StructField("payload_encrypted",StringType),
StructField("payload_cleartext",StringType),
StructField("device_properties", new ArrayType (new StructType().add("appeui",StringType).add("deveui",StringType).add("external_id",StringType).add("no_de_serie_eolane",LongType).add("no_emballage",StringType).add("product_version",StringType),false)),
StructField("protocol_data", new ArrayType (new StructType().add("AppNonce",StringType).add("DevAddr",StringType).add("DevNonce",StringType).add("NetID",LongType).add("best_gateway_id",StringType).add("gateways",IntegerType).add("lora_version",IntegerType).add("noise",LongType).add("port",IntegerType).add("rssi",DoubleType).add("sf",IntegerType).add("signal",DoubleType).add("snr",DoubleType),false)),
StructField("lat",StringType),
StructField("lng",StringType),
StructField("geolocation_type",StringType),
StructField("geolocation_precision",StringType),
StructField("delivered_at",StringType)))


val dataframe_extract=message.select($"Id",
$"publishTime",
from_json($"data",schemaTotal).as("content"))

val table = dataframe_extract.select(
$"Id",
$"publishTime",
$"content.id" as "id",
$"content.device_id" as "device_id",
$"content.group_id" as "group_id",
$"content.group" as "group",
$"content.profile_id" as "profile_id",
$"content.profile" as "profile",
$"content.type" as "type",
$"content.timestamp" as "timestamp",
$"content.count" as "count",
$"content.payload.timestamp" as "timestamp2",
$"content.payload.data.battery" as "battery",
$"content.payload.data.temperature" as "temperature",
$"content.payload_encrypted" as "payload_encrypted",
$"content.payload_cleartext" as "payload_cleartext",
$"content.device_properties.appeui" as "appeui"
)

table.show() gives me null values for all columns:

    +---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|             Id|         publishTime|  id|device_id|group_id|group|profile_id|profile|type|timestamp|count|timestamp2|battery|temperature|payload_encrypted|payload_cleartext|appeui|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+
|118738748835150|2018-08-20T17:44:...|null|     null|    null| null|      null|   null|null|     null| null|      null|   null|       null|             null|             null|  null|
+---------------+--------------------+----+---------+--------+-----+----------+-------+----+---------+-----+----------+-------+-----------+-----------------+-----------------+------+

, whereas table.printSchema() gives me the expected result, any idea how to solve this, please? I am working with Zeppelin as a first prototyping step thanks a lot in advance for your help. Best Regards

like image 842
scalacode Avatar asked Jan 27 '23 13:01

scalacode


1 Answers

from_json() SQL function has below constraint to be followed to convert column value to a dataframe.

  1. whatever the datatype you have defined in the schema should match with the value present in the json, if there is any column's mismatch value leads to null in all column values

e.g.:

'{"name": "raj", "age": 12}' for this column value

StructType(List(StructField(name,StringType,true),StructField(age,StringType,true)))

The above schema will return you a null value on both the columns

StructType(List(StructField(name,StringType,true),StructField(age,IntegerType,true)))

The above schema will return you an expected dataframe

For this thread possible reason could be this, if there is any mismatched column value present, from_json will return all column value as null

like image 141
Dharaneesh Vrd Avatar answered Jan 31 '23 20:01

Dharaneesh Vrd