Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pyspark: Parse a column of json strings

I have a pyspark dataframe consisting of one column, called json, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.

# Sample Data Frame jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)]) 

I've tried mapping over each row with json.loads:

(df   .select('json')   .rdd   .map(lambda x: json.loads(x))   .toDF() ).show() 

But this returns a TypeError: expected string or buffer

I suspect that part of the problem is that when converting from a dataframe to an rdd, the schema information is lost, so I've also tried manually entering in the schema info:

schema = StructType([StructField('json', StringType(), True)]) rdd = (df   .select('json')   .rdd   .map(lambda x: json.loads(x)) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show() 

But I get the same TypeError.

Looking at this answer, it looks like flattening out the rows with flatMap might be useful here, but I'm not having success with that either:

schema = StructType([StructField('json', StringType(), True)]) rdd = (df   .select('json')   .rdd   .flatMap(lambda x: x)   .flatMap(lambda x: json.loads(x))   .map(lambda x: x.get('body')) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show() 

I get this error: AttributeError: 'unicode' object has no attribute 'get'.

like image 518
Steve Avatar asked Dec 12 '16 19:12

Steve


People also ask

What does explode () do in a json field?

The explode function explodes the dataframe into multiple rows.

How do I select a column from a DataFrame in PySpark?

Select Single & Multiple Columns From PySpark You can select the single or multiple columns of the DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with selected columns.


2 Answers

For Spark 2.1+, you can use from_json which allows the preservation of the other non-json columns within the dataframe as follows:

from pyspark.sql.functions import from_json, col json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema df.withColumn('json', from_json(col('json'), json_schema)) 

You let Spark derive the schema of the json string column. Then the df.json column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType and all the other columns of df are preserved as-is.

You can access the json content as follows:

df.select(col('json.header').alias('header')) 
like image 137
Martin Tapp Avatar answered Sep 19 '22 12:09

Martin Tapp


Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)

For example:

>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json)) >>> new_df.printSchema() root  |-- body: struct (nullable = true)  |    |-- id: long (nullable = true)  |    |-- name: string (nullable = true)  |    |-- sub_json: struct (nullable = true)  |    |    |-- id: long (nullable = true)  |    |    |-- sub_sub_json: struct (nullable = true)  |    |    |    |-- col1: long (nullable = true)  |    |    |    |-- col2: string (nullable = true)  |-- header: struct (nullable = true)  |    |-- foo: string (nullable = true)  |    |-- id: long (nullable = true) 
like image 29
Mariusz Avatar answered Sep 20 '22 12:09

Mariusz