I use the DataFrame API from Spark 1.3.
I would like to get the day of the week from a date in a DataFrame without losing all elements of the DataFrame.
I used to use jodatime to get it on a simple map before using DataFrame API.
Right now a solution that works:
sqlContext.createDataFrame(myDataFrame.map(l=>operationOnTheField(l)),myDataFrame.schema))
Is it possible to do the operation without going back to a map on an RDD[Row]
and then create a DataFrame with this RDD?
Binary Operations Binary means 'two', if we perform any operation between two elements then it is a Binary Operation. This includes addition, subtraction, multiplication and division.
DataFrame. DataFrame is a 2-dimensional labeled data structure with columns of potentially different types. You can think of it like a spreadsheet or SQL table, or a dict of Series objects. It is generally the most commonly used pandas object.
You can use a combination of calling select()
on the DataFrame
and a user-defined function (UDF) to transform the column in question.
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._
A case class to set up the example DataFrame
.
private case class Cust(id: Integer, name: String,
sales: Double, discount: Double, state: String)
Then set up a SQLContext
and create the DataFrame
as follows:
import sqlContext.implicits._
val custs = Seq(
Cust(1, "Widget Co", 120000.00, 0.00, "AZ"),
Cust(2, "Acme Widgets", 410500.00, 500.00, "CA"),
Cust(3, "Widgetry", 410500.00, 200.00, "CA"),
Cust(4, "Widgets R Us", 410500.00, 0.0, "CA"),
Cust(5, "Ye Olde Widgete", 500.00, 0.0, "MA")
)
val customerDF = sc.parallelize(custs, 4).toDF()
Register a simple UDF that you'll use to transform the "discount" column.
val myFunc = udf {(x: Double) => x + 1}
Get the columns, having applied the UDF to the "discount" column and leaving the others as they were.
val colNames = customerDF.columns
val cols = colNames.map(cName => customerDF.col(cName))
val theColumn = customerDF("discount")
I'd like to find a "better" way to match the column but the following works.
Use as()
to give the column a new name just because we can!
val mappedCols = cols.map(c =>
if (c.toString() == theColumn.toString()) myFunc(c).as("transformed") else c)
Use select() to produce the new DataFrame
val newDF = customerDF.select(mappedCols:_*)
You've changed
id name sales discount state
1 Widget Co 120000.0 0.0 AZ
2 Acme Widgets 410500.0 500.0 CA
3 Widgetry 410500.0 200.0 CA
4 Widgets R Us 410500.0 0.0 CA
5 Ye Olde Widgete 500.0 0.0 MA
into
id name sales transformed state
1 Widget Co 120000.0 1.0 AZ
2 Acme Widgets 410500.0 501.0 CA
3 Widgetry 410500.0 201.0 CA
4 Widgets R Us 410500.0 1.0 CA
5 Ye Olde Widgete 500.0 1.0 MA
You can find the full example source code here. You can make it simpler if you're not fussy about an exact column replacement.
Try this
Table.select(Table("Otherkey"),MyUdf(Table("ColNeeded")).as("UdfTransformed"))
MyUdf is a udf defined by you.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With