Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Operation on Data Frame

I use the DataFrame API from Spark 1.3.

I would like to get the day of the week from a date in a DataFrame without losing all elements of the DataFrame.

I used to use jodatime to get it on a simple map before using DataFrame API.

Right now a solution that works:

sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema))

Is it possible to do the operation without going back to a map on an RDD[Row] and then create a DataFrame with this RDD?

like image 653
jnaour Avatar asked Mar 19 '15 17:03

jnaour


People also ask

What are binary operations in DataFrame?

Binary Operations Binary means 'two', if we perform any operation between two elements then it is a Binary Operation. This includes addition, subtraction, multiplication and division.

What is a data frame in Python?

DataFrame. DataFrame is a 2-dimensional labeled data structure with columns of potentially different types. You can think of it like a spreadsheet or SQL table, or a dict of Series objects. It is generally the most commonly used pandas object.


2 Answers

You can use a combination of calling select() on the DataFrame and a user-defined function (UDF) to transform the column in question.

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._

A case class to set up the example DataFrame.

private case class Cust(id: Integer, name: String, 
        sales: Double, discount: Double, state: String)

Then set up a SQLContext and create the DataFrame as follows:

import sqlContext.implicits._

val custs = Seq(
  Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
  Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
  Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
  Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
  Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
val customerDF = sc.parallelize(custs, 4).toDF()

Register a simple UDF that you'll use to transform the "discount" column.

val myFunc = udf {(x: Double) => x + 1}

Get the columns, having applied the UDF to the "discount" column and leaving the others as they were.

val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")

I'd like to find a "better" way to match the column but the following works. Use as() to give the column a new name just because we can!

val mappedCols = cols.map(c => 
  if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)

Use select() to produce the new DataFrame

val newDF = customerDF.select(mappedCols:_*)

You've changed

id name            sales    discount state
1  Widget Co       120000.0 0.0      AZ   
2  Acme Widgets    410500.0 500.0    CA   
3  Widgetry        410500.0 200.0    CA   
4  Widgets R Us    410500.0 0.0      CA   
5  Ye Olde Widgete 500.0    0.0      MA   

into

id name            sales    transformed state
1  Widget Co       120000.0 1.0         AZ   
2  Acme Widgets    410500.0 501.0       CA   
3  Widgetry        410500.0 201.0       CA   
4  Widgets R Us    410500.0 1.0         CA   
5  Ye Olde Widgete 500.0    1.0         MA   

You can find the full example source code here. You can make it simpler if you're not fussy about an exact column replacement.

like image 146
Spiro Michaylov Avatar answered Nov 04 '22 06:11

Spiro Michaylov


Try this

Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed"))

MyUdf is a udf defined by you.

like image 31
skywalkerytx Avatar answered Nov 04 '22 04:11

skywalkerytx