Is it possible and what would be the most efficient neat method to add a column to Data Frame?
More specifically, column may serve as Row IDs for the existing Data Frame.
In a simplified case, reading from file and not tokenizing it, I can think of something as below (in Scala), but it completes with errors (at line 3), and anyways doesn't look like the best route possible:
var dataDF = sc.textFile("path/file").toDF()
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID")
dataDF = dataDF.withColumn("ID", rowDF("ID"))
In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .
A new column could be added to an existing Dataset using Dataset. withColumn() method. withColumn accepts two arguments: the column name to be added, and the Column and returns a new Dataset<Row>. The syntax of withColumn() is provided below.
Here we create an empty DataFrame where data is to be added, then we convert the data to be added into a Spark DataFrame using createDataFrame() and further convert both DataFrames to a Pandas DataFrame using toPandas() and use the append() function to add the non-empty data frame to the empty DataFrame and ignore the ...
Adding a new column or multiple columns to Spark DataFrame can be done using withColumn () and select () methods of DataFrame, In this article, I will explain how to add a new column from the existing column, adding a constant or literal value and finally adding a list column to DataFrame.
A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set. val people = sqlContext.read.parquet("...") // in Scala DataFrame people = sqlContext.read().parquet("...") // in Java Once...
Data Sources. Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data.
public class DataFrame extends java.lang.Object implements scala.Serializable :: Experimental :: A distributed collection of data organized into named columns. A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set.
It's been a while since I posted the question and it seems that some other people would like to get an answer as well. Below is what I found.
So the original task was to append a column with row identificators (basically, a sequence 1 to numRows
) to any given data frame, so the rows order/presence can be tracked (e.g. when you sample). This can be achieved by something along these lines:
sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))
Regarding the general case of appending any column to any data frame:
The "closest" to this functionality in Spark API are withColumn
and withColumnRenamed
. According to Scala docs, the former Returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can operate on this
data frame only, i.e. given two data frames df1
and df2
with column col
:
val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL
So unless you can manage to transform a column in an existing dataframe to the shape you need, you can't use withColumn
or withColumnRenamed
for appending arbitrary columns (standalone or other data frames).
As it was commented above, the workaround solution may be to use a join
- this would be pretty messy, although possible - attaching the unique keys like above with zipWithIndex
to both data frames or columns might work. Although efficiency is ...
It's clear that appending a column to the data frame is not an easy functionality for distributed environment and there may not be very efficient, neat method for that at all. But I think that it's still very important to have this core functionality available, even with performance warnings.
not sure if it works in spark 1.3 but in spark 1.5 I use withColumn:
import sqlContext.implicits._
import org.apache.spark.sql.functions._
df.withColumn("newName",lit("newValue"))
I use this when I need to use a value that is not related to existing columns of the dataframe
This is similar to @NehaM's answer but simpler
I took help from above answer. However, I find it incomplete if we want to change a DataFrame
and current APIs are little different in Spark 1.6
.
zipWithIndex()
returns a Tuple
of (Row, Long)
which contains each row and corresponding index. We can use it to create new Row
according to our need.
val rdd = df.rdd.zipWithIndex()
.map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show
I hope this will be helpful.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With