Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Append a column to Data Frame in Apache Spark 1.3

Is it possible and what would be the most efficient neat method to add a column to Data Frame?

More specifically, column may serve as Row IDs for the existing Data Frame.

In a simplified case, reading from file and not tokenizing it, I can think of something as below (in Scala), but it completes with errors (at line 3), and anyways doesn't look like the best route possible:

var dataDF = sc.textFile("path/file").toDF() 
val rowDF = sc.parallelize(1 to DataDF.count().toInt).toDF("ID") 
dataDF = dataDF.withColumn("ID", rowDF("ID")) 
like image 454
Oleg Shirokikh Avatar asked Apr 07 '15 03:04

Oleg Shirokikh


People also ask

How do I append a column to a Spark in a DataFrame?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I add a column in Spark Dataset?

A new column could be added to an existing Dataset using Dataset. withColumn() method. withColumn accepts two arguments: the column name to be added, and the Column and returns a new Dataset<Row>. The syntax of withColumn() is provided below.

How do you add data to a DataFrame in Spark?

Here we create an empty DataFrame where data is to be added, then we convert the data to be added into a Spark DataFrame using createDataFrame() and further convert both DataFrames to a Pandas DataFrame using toPandas() and use the append() function to add the non-empty data frame to the empty DataFrame and ignore the ...

How to add a new column to a spark dataframe?

Adding a new column or multiple columns to Spark DataFrame can be done using withColumn () and select () methods of DataFrame, In this article, I will explain how to add a new column from the existing column, adding a constant or literal value and finally adding a list column to DataFrame.

How to create a Dataframe from a parquet data set in spark?

A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set. val people = sqlContext.read.parquet("...") // in Scala DataFrame people = sqlContext.read().parquet("...") // in Java Once...

Can You Run SQL on spark dataframe?

Data Sources. Spark SQL supports operating on a variety of data sources through the DataFrame interface. A DataFrame can be operated on as normal RDDs and can also be registered as a temporary table. Registering a DataFrame as a table allows you to run SQL queries over its data.

What is Dataframe in Scala?

public class DataFrame extends java.lang.Object implements scala.Serializable :: Experimental :: A distributed collection of data organized into named columns. A DataFrame is equivalent to a relational table in Spark SQL. The following example creates a DataFrame by pointing Spark SQL to a Parquet data set.


3 Answers

It's been a while since I posted the question and it seems that some other people would like to get an answer as well. Below is what I found.

So the original task was to append a column with row identificators (basically, a sequence 1 to numRows) to any given data frame, so the rows order/presence can be tracked (e.g. when you sample). This can be achieved by something along these lines:

sqlContext.textFile(file).
zipWithIndex().
map(case(d, i)=>i.toString + delimiter + d).
map(_.split(delimiter)).
map(s=>Row.fromSeq(s.toSeq))

Regarding the general case of appending any column to any data frame:

The "closest" to this functionality in Spark API are withColumn and withColumnRenamed. According to Scala docs, the former Returns a new DataFrame by adding a column. In my opinion, this is a bit confusing and incomplete definition. Both of these functions can operate on this data frame only, i.e. given two data frames df1 and df2 with column col:

val df = df1.withColumn("newCol", df1("col") + 1) // -- OK
val df = df1.withColumn("newCol", df2("col") + 1) // -- FAIL

So unless you can manage to transform a column in an existing dataframe to the shape you need, you can't use withColumn or withColumnRenamed for appending arbitrary columns (standalone or other data frames).

As it was commented above, the workaround solution may be to use a join - this would be pretty messy, although possible - attaching the unique keys like above with zipWithIndex to both data frames or columns might work. Although efficiency is ...

It's clear that appending a column to the data frame is not an easy functionality for distributed environment and there may not be very efficient, neat method for that at all. But I think that it's still very important to have this core functionality available, even with performance warnings.

like image 158
Oleg Shirokikh Avatar answered Oct 16 '22 16:10

Oleg Shirokikh


not sure if it works in spark 1.3 but in spark 1.5 I use withColumn:

import sqlContext.implicits._
import org.apache.spark.sql.functions._


df.withColumn("newName",lit("newValue"))

I use this when I need to use a value that is not related to existing columns of the dataframe

This is similar to @NehaM's answer but simpler

like image 30
Tal Joffe Avatar answered Oct 16 '22 16:10

Tal Joffe


I took help from above answer. However, I find it incomplete if we want to change a DataFrame and current APIs are little different in Spark 1.6. zipWithIndex() returns a Tuple of (Row, Long) which contains each row and corresponding index. We can use it to create new Row according to our need.

val rdd = df.rdd.zipWithIndex()
             .map(indexedRow => Row.fromSeq(indexedRow._2.toString +: indexedRow._1.toSeq))
val newstructure = StructType(Seq(StructField("Row number", StringType, true)).++(df.schema.fields))
sqlContext.createDataFrame(rdd, newstructure ).show

I hope this will be helpful.

like image 6
NehaM Avatar answered Oct 16 '22 15:10

NehaM