Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Derive multiple columns from a single column in a Spark DataFrame

I have a DF with a huge parseable metadata as a single string column in a Dataframe, lets call it DFA, with ColmnA.

I would like to break this column, ColmnA into multiple columns thru a function, ClassXYZ = Func1(ColmnA). This function returns a class ClassXYZ, with multiple variables, and each of these variables now has to be mapped to new Column, such a ColmnA1, ColmnA2 etc.

How would I do such a transformation from 1 Dataframe to another with these additional columns by calling this Func1 just once, and not have to repeat-it to create all the columns.

Its easy to solve if I were to call this huge function every time to add a new column, but that what I wish to avoid.

Kindly please advise with a working or pseudo code.

Thanks

Sanjay

like image 871
sshroff Avatar asked Aug 25 '15 05:08

sshroff


People also ask

How do I split one column into multiple columns in Spark?

pyspark. sql. functions provide a function split() which is used to split DataFrame string Column into multiple columns.

How do I split a column in Scala Spark?

Spark split() function to convert string to Array column. Spark SQL provides split() function to convert delimiter separated String to array (StringType to ArrayType) column on Dataframe. This can be done by splitting a string column based on a delimiter like space, comma, pipe e.t.c, and converting into ArrayType.

How do I get columns from a data frame Spark?

You can get the all columns of a Spark DataFrame by using df. columns , it returns an array of column names as Array[Stirng] .


1 Answers

Generally speaking what you want is not directly possible. UDF can return only a single column at the time. There are two different ways you can overcome this limitation:

  1. Return a column of complex type. The most general solution is a StructType but you can consider ArrayType or MapType as well.

    import org.apache.spark.sql.functions.udf  val df = Seq(   (1L, 3.0, "a"), (2L, -1.0, "b"), (3L, 0.0, "c") ).toDF("x", "y", "z")  case class Foobar(foo: Double, bar: Double)  val foobarUdf = udf((x: Long, y: Double, z: String) =>    Foobar(x * y, z.head.toInt * y))  val df1 = df.withColumn("foobar", foobarUdf($"x", $"y", $"z")) df1.show // +---+----+---+------------+ // |  x|   y|  z|      foobar| // +---+----+---+------------+ // |  1| 3.0|  a| [3.0,291.0]| // |  2|-1.0|  b|[-2.0,-98.0]| // |  3| 0.0|  c|   [0.0,0.0]| // +---+----+---+------------+  df1.printSchema // root //  |-- x: long (nullable = false) //  |-- y: double (nullable = false) //  |-- z: string (nullable = true) //  |-- foobar: struct (nullable = true) //  |    |-- foo: double (nullable = false) //  |    |-- bar: double (nullable = false) 

    This can be easily flattened later but usually there is no need for that.

  2. Switch to RDD, reshape and rebuild DF:

    import org.apache.spark.sql.types._ import org.apache.spark.sql.Row  def foobarFunc(x: Long, y: Double, z: String): Seq[Any] =    Seq(x * y, z.head.toInt * y)  val schema = StructType(df.schema.fields ++   Array(StructField("foo", DoubleType), StructField("bar", DoubleType)))  val rows = df.rdd.map(r => Row.fromSeq(   r.toSeq ++   foobarFunc(r.getAs[Long]("x"), r.getAs[Double]("y"), r.getAs[String]("z"))))  val df2 = sqlContext.createDataFrame(rows, schema)  df2.show // +---+----+---+----+-----+ // |  x|   y|  z| foo|  bar| // +---+----+---+----+-----+ // |  1| 3.0|  a| 3.0|291.0| // |  2|-1.0|  b|-2.0|-98.0| // |  3| 0.0|  c| 0.0|  0.0| // +---+----+---+----+-----+ 
like image 141
zero323 Avatar answered Oct 09 '22 12:10

zero323