Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add a new Struct column to a DataFrame

Tags:

I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points.

The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_point type.

Is there a way in Spark to copy the lat and lon columns to a new column that is an array or struct?

Any help is appreciated!

like image 541
Kim Ngo Avatar asked Jul 24 '15 16:07

Kim Ngo


People also ask

What is struct column?

Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField's.

How do I add a column to spark DataFrame?

You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().

How do you use a struct in PySpark?

PySpark STRUCTTYPE is a way of creating of a data frame in PySpark. PySpark STRUCTTYPE contains a list of Struct Field that has the structure defined for the data frame. PySpark STRUCTTYPE removes the dependency from spark code. PySpark STRUCTTYPE returns the schema for the data frame.


1 Answers

I assume you start with some kind of flat schema like this:

root  |-- lat: double (nullable = false)  |-- long: double (nullable = false)  |-- key: string (nullable = false) 

First lets create example data:

import org.apache.spark.sql.Row import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types._  val rdd = sc.parallelize(     Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil)  val schema = StructType(     StructField("lat", DoubleType, false) ::     StructField("long", DoubleType, false) ::     StructField("key", StringType, false) ::Nil)  val df = sqlContext.createDataFrame(rdd, schema) 

An easy way is to use an udf and case class:

case class Location(lat: Double, long: Double) val makeLocation = udf((lat: Double, long: Double) => Location(lat, long))  val dfRes = df.    withColumn("location", makeLocation(col("lat"), col("long"))).    drop("lat").    drop("long")  dfRes.printSchema 

and we get

root  |-- key: string (nullable = false)  |-- location: struct (nullable = true)  |    |-- lat: double (nullable = false)  |    |-- long: double (nullable = false) 

A hard way is to transform your data and apply schema afterwards:

val rddRes = df.     map{case Row(lat, long, key) => Row(key, Row(lat, long))}  val schemaRes = StructType(     StructField("key", StringType, false) ::     StructField("location", StructType(         StructField("lat", DoubleType, false) ::         StructField("long", DoubleType, false) :: Nil     ), true) :: Nil  )  sqlContext.createDataFrame(rddRes, schemaRes).show 

and we get an expected output

+------+-------------+ |   key|     location| +------+-------------+ |Warsaw|[52.23,21.01]| | Corte|  [42.3,9.15]| +------+-------------+ 

Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:

case class Pin(location: Location) val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long))  df.     withColumn("pin", makePin(col("lat"), col("long"))).     drop("lat").     drop("long").     printSchema 

and we get expected output:

root  |-- key: string (nullable = false)  |-- pin: struct (nullable = true)  |    |-- location: struct (nullable = true)  |    |    |-- lat: double (nullable = false)  |    |    |-- long: double (nullable = false) 

Unfortunately you have no control over nullable field so if is important for your project you'll have to specify schema.

Finally you can use struct function introduced in 1.4:

import org.apache.spark.sql.functions.struct  df.select($"key", struct($"lat", $"long").alias("location")) 
like image 72
zero323 Avatar answered Oct 24 '22 15:10

zero323