I have a pyspark dataframe consisting of one column, called json
, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.
# Sample Data Frame jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}' jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}' jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}' df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
I've tried mapping over each row with json.loads
:
(df .select('json') .rdd .map(lambda x: json.loads(x)) .toDF() ).show()
But this returns a TypeError: expected string or buffer
I suspect that part of the problem is that when converting from a dataframe
to an rdd
, the schema information is lost, so I've also tried manually entering in the schema info:
schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .map(lambda x: json.loads(x)) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show()
But I get the same TypeError
.
Looking at this answer, it looks like flattening out the rows with flatMap
might be useful here, but I'm not having success with that either:
schema = StructType([StructField('json', StringType(), True)]) rdd = (df .select('json') .rdd .flatMap(lambda x: x) .flatMap(lambda x: json.loads(x)) .map(lambda x: x.get('body')) ) new_df = sql_context.createDataFrame(rdd, schema) new_df.show()
I get this error: AttributeError: 'unicode' object has no attribute 'get'
.
The explode function explodes the dataframe into multiple rows.
Select Single & Multiple Columns From PySpark You can select the single or multiple columns of the DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with selected columns.
For Spark 2.1+, you can use from_json
which allows the preservation of the other non-json columns within the dataframe as follows:
from pyspark.sql.functions import from_json, col json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema df.withColumn('json', from_json(col('json'), json_schema))
You let Spark derive the schema of the json string column. Then the df.json
column is no longer a StringType, but the correctly decoded json structure, i.e., nested StrucType
and all the other columns of df
are preserved as-is.
You can access the json content as follows:
df.select(col('json.header').alias('header'))
Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)
For example:
>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json)) >>> new_df.printSchema() root |-- body: struct (nullable = true) | |-- id: long (nullable = true) | |-- name: string (nullable = true) | |-- sub_json: struct (nullable = true) | | |-- id: long (nullable = true) | | |-- sub_sub_json: struct (nullable = true) | | | |-- col1: long (nullable = true) | | | |-- col2: string (nullable = true) |-- header: struct (nullable = true) | |-- foo: string (nullable = true) | |-- id: long (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With