Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concatenate columns in Apache Spark DataFrame

People also ask

How do I concatenate columns in spark DataFrame?

PySpark – concat() concat() will join two or more columns in the given PySpark DataFrame and add these values into a new column. By using the select() method, we can view the column concatenated, and by using an alias() method, we can name the concatenated column.

How do you concatenate 3 columns in Pyspark?

Using concat_ws() function of Pypsark SQL concatenated three string input columns (firstname, middlename, lastname) into a single string column (Fullname) and separated each column with “_” separator. Below is the output for for concat_ws() funtion of Pyspark sql.

How do you add two string columns in Pyspark?

Concatenating columns in pyspark is accomplished using concat() Function. Concatenating two columns is accomplished using concat() Function. Concatenating multiple columns is accomplished using concat() Function. Concatenating columns in pyspark is accomplished using concat() Function.

How do you add two columns in spark?

You can add multiple columns to Spark DataFrame in several ways if you wanted to add a known set of columns you can easily do by chaining withColumn() or on select(). However, sometimes you may need to add multiple columns after applying some transformations n that case you can use either map() or foldLeft().


With raw SQL you can use CONCAT:

  • In Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • In Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

Since Spark 1.5.0 you can use concat function with DataFrame API:

  • In Python :

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • In Scala :

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))
    

There is also concat_ws function which takes a string separator as the first argument.


Here's how you can do custom naming

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

gives,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

create new column by concatenating:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+

One option to concatenate string columns in Spark Scala is using concat.

It is necessary to check for null values. Because if one of the columns is null, the result will be null even if one of the other columns do have information.

Using concat and withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Using concat and select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

With both approaches you will have a NEW_COLUMN which value is a concatenation of the columns: COL1 and COL2 from your original df.


concat(*cols)

v1.5 and higher

Concatenates multiple input columns together into a single column. The function works with strings, binary and compatible array columns.

Eg: new_df = df.select(concat(df.a, df.b, df.c))


concat_ws(sep, *cols)

v1.5 and higher

Similar to concat but uses the specified separator.

Eg: new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat(*cols)

v2.4 and higher

Used to concat maps, returns the union of all the given maps.

Eg: new_df = df.select(map_concat("map1", "map2"))


Using string concat operator (||):

v2.3 and higher

Eg: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

Reference: Spark sql doc


If you want to do it using DF, you could use a udf to add a new column based on existing columns.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()