Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

PySpark dataframe to_json() function

I have a dataframe like below,

>>> df.show(10,False)
+-----+----+---+------+
|id   |name|age|salary|
+-----+----+---+------+
|10001|alex|30 |75000 |
|10002|bob |31 |80000 |
|10003|deb |31 |80000 |
|10004|john|33 |85000 |
|10005|sam |30 |75000 |
+-----+----+---+------+

Converting the entire row of df into one new column "jsonCol",

>>> newDf1 = df.withColumn("jsonCol", to_json(struct([df[x] for x in df.columns])))
>>> newDf1.show(10,False)
+-----+----+---+------+--------------------------------------------------------+
|id   |name|age|salary|jsonCol                                                 |
+-----+----+---+------+--------------------------------------------------------+
|10001|alex|30 |75000 |{"id":"10001","name":"alex","age":"30","salary":"75000"}|
|10002|bob |31 |80000 |{"id":"10002","name":"bob","age":"31","salary":"80000"} |
|10003|deb |31 |80000 |{"id":"10003","name":"deb","age":"31","salary":"80000"} |
|10004|john|33 |85000 |{"id":"10004","name":"john","age":"33","salary":"85000"}|
|10005|sam |30 |75000 |{"id":"10005","name":"sam","age":"30","salary":"75000"} |
+-----+----+---+------+--------------------------------------------------------+

Instead of converting the entire row into a JSON string like in the above step I needed a solution to select only few columns based on the value of the field. I have provided a sample condition in the below command.

But when I started using the when function, the resultant JSON string's column names(keys) are gone. Only getting column names by their position, instead of the actual column names(keys)

>>> newDf2 = df.withColumn("jsonCol", to_json(struct([ when(col(x)!="  ",df[x]).otherwise(None) for x in df.columns])))
>>> newDf2.show(10,False)
+-----+----+---+------+---------------------------------------------------------+
|id   |name|age|salary|jsonCol                                                  |
+-----+----+---+------+---------------------------------------------------------+
|10001|alex|30 |75000 |{"col1":"10001","col2":"alex","col3":"30","col4":"75000"}|
|10002|bob |31 |80000 |{"col1":"10002","col2":"bob","col3":"31","col4":"80000"} |
|10003|deb |31 |80000 |{"col1":"10003","col2":"deb","col3":"31","col4":"80000"} |
|10004|john|33 |85000 |{"col1":"10004","col2":"john","col3":"33","col4":"85000"}|
|10005|sam |30 |75000 |{"col1":"10005","col2":"sam","col3":"30","col4":"75000"} |
+-----+----+---+------+---------------------------------------------------------+

I needed to use the when function but to have the results as in newDf1 with actual column names(keys). Can someone help me out?

like image 330
vishnu ram Avatar asked Apr 01 '18 21:04

vishnu ram


People also ask

How do you convert a Dataframe to a string in PySpark?

In order to convert array to a string, PySpark SQL provides a built-in function concat_ws() which takes delimiter of your choice as a first argument and array column (type Column) as the second argument. In order to use concat_ws() function, you need to import it using pyspark. sql.

How do you convert a string to a JSON object in Python?

Use the json.loads() function. The json. loads() function accepts as input a valid string and converts it to a Python dictionary. This process is called deserialization – the act of converting a string to an object.


1 Answers

You have used conditions inside struct function as columns and the condition columns are renamed as col1 col2 .... and thats why you need alias to change the names

from pyspark.sql import functions as F
newDf2 = df.withColumn("jsonCol", F.to_json(F.struct([F.when(F.col(x)!="  ",df[x]).otherwise(None).alias(x) for x in df.columns])))
newDf2.show(truncate=False)
like image 150
Ramesh Maharjan Avatar answered Oct 18 '22 12:10

Ramesh Maharjan