I'm currently trying to extract a database from MongoDB and use Spark to ingest into ElasticSearch with geo_points
.
The Mongo database has latitude and longitude values, but ElasticSearch requires them to be casted into the geo_point
type.
Is there a way in Spark to copy the lat
and lon
columns to a new column that is an array
or struct
?
Any help is appreciated!
Spark SQL StructType & StructField classes are used to programmatically specify the schema to the DataFrame and creating complex columns like nested struct, array and map columns. StructType is a collection of StructField's.
You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().
PySpark STRUCTTYPE is a way of creating of a data frame in PySpark. PySpark STRUCTTYPE contains a list of Struct Field that has the structure defined for the data frame. PySpark STRUCTTYPE removes the dependency from spark code. PySpark STRUCTTYPE returns the schema for the data frame.
I assume you start with some kind of flat schema like this:
root |-- lat: double (nullable = false) |-- long: double (nullable = false) |-- key: string (nullable = false)
First lets create example data:
import org.apache.spark.sql.Row import org.apache.spark.sql.functions.{col, udf} import org.apache.spark.sql.types._ val rdd = sc.parallelize( Row(52.23, 21.01, "Warsaw") :: Row(42.30, 9.15, "Corte") :: Nil) val schema = StructType( StructField("lat", DoubleType, false) :: StructField("long", DoubleType, false) :: StructField("key", StringType, false) ::Nil) val df = sqlContext.createDataFrame(rdd, schema)
An easy way is to use an udf and case class:
case class Location(lat: Double, long: Double) val makeLocation = udf((lat: Double, long: Double) => Location(lat, long)) val dfRes = df. withColumn("location", makeLocation(col("lat"), col("long"))). drop("lat"). drop("long") dfRes.printSchema
and we get
root |-- key: string (nullable = false) |-- location: struct (nullable = true) | |-- lat: double (nullable = false) | |-- long: double (nullable = false)
A hard way is to transform your data and apply schema afterwards:
val rddRes = df. map{case Row(lat, long, key) => Row(key, Row(lat, long))} val schemaRes = StructType( StructField("key", StringType, false) :: StructField("location", StructType( StructField("lat", DoubleType, false) :: StructField("long", DoubleType, false) :: Nil ), true) :: Nil ) sqlContext.createDataFrame(rddRes, schemaRes).show
and we get an expected output
+------+-------------+ | key| location| +------+-------------+ |Warsaw|[52.23,21.01]| | Corte| [42.3,9.15]| +------+-------------+
Creating nested schema from scratch can be tedious so if you can I would recommend the first approach. It can be easily extended if you need more sophisticated structure:
case class Pin(location: Location) val makePin = udf((lat: Double, long: Double) => Pin(Location(lat, long)) df. withColumn("pin", makePin(col("lat"), col("long"))). drop("lat"). drop("long"). printSchema
and we get expected output:
root |-- key: string (nullable = false) |-- pin: struct (nullable = true) | |-- location: struct (nullable = true) | | |-- lat: double (nullable = false) | | |-- long: double (nullable = false)
Unfortunately you have no control over nullable
field so if is important for your project you'll have to specify schema.
Finally you can use struct
function introduced in 1.4:
import org.apache.spark.sql.functions.struct df.select($"key", struct($"lat", $"long").alias("location"))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With