Supposing, I have a json file with lines in follow structure:
{
"a": 1,
"b": {
"bb1": 1,
"bb2": 2
}
}
I want to change the value of key bb1
or add a new key, like: bb3
.
Currently, I use spark.read.json to load the json file into spark as DataFrame and df.rdd.map to map each row of RDD to dict. Then, change nested key value or add a nested key and convert the dict to row. Finally, convert RDD to DataFrame.
The workflow works as follow:
def map_func(row):
dictionary = row.asDict(True)
adding new key or changing key value
return as_row(dictionary) # as_row convert dict to row recursively
df = spark.read.json("json_file")
df.rdd.map(map_func).toDF().write.json("new_json_file")
This could work for me. But I concern that converting DataFrame -> RDD ( Row -> dict -> Row) -> DataFrame would kill the efficiency. Is there any other methods that could work for this demand but not at the cost of efficiency?
The final solution that I used is using withColumn and dynamically building the schema of b.
Firstly, we can get the b_schema
from df schema by:
b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b')
After that, b_schema
is dict and we can add new field into it by:
b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True})
And then, we could convert it to StructType by:
new_b = StructType.fromJson(b_schema)
In the map_func, we could convert Row to dict and populate the new field:
def map_func(row):
data = row.asDict(True)
data['bb3'] = data['bb1'] + data['bb2']
return data
map_udf = udf(map_func, new_b)
df.withColumn('b', map_udf('b')).collect()
Thanks @Mariusz
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
You can do update a PySpark DataFrame Column using withColum(), select() and sql(), since DataFrame's are distributed immutable collection you can't really change the column values however when you change the value using withColumn() or any approach, PySpark returns a new Dataframe with updated values.
You can use map_func
as udf and therefore omit converting DF -> RDD -> DF, still having the flexibility of python to implement business logic. All you need is to create schema object:
>>> from pyspark.sql.types import *
>>> new_b = StructType([StructField('bb1', LongType()), StructField('bb2', LongType()), StructField('bb3', LongType())])
Then you define map_func
and udf:
>>> from pyspark.sql.functions import *
>>> def map_func(data):
... return {'bb1': 4, 'bb2': 5, 'bb3': 6}
...
>>> map_udf = udf(map_func, new_b)
Finally apply this UDF to dataframe:
>>> df = spark.read.json('sample.json')
>>> df.withColumn('b', map_udf('b')).first()
Row(a=1, b=Row(bb1=4, bb2=5, bb3=6))
EDIT:
According to the comment: You can add a field to existing StructType in a easier way, for example:
>>> df = spark.read.json('sample.json')
>>> new_b = df.schema['b'].dataType.add(StructField('bb3', LongType()))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With