Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to update column based on a condition (a value in a group)?

I have the following df:

+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn| blue|
|  3|  fn|green|
+---+----+-----+

If any of the color column values is red, then I all values of the color column should be updated to be red, as below:

+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn|  red|
|  3|  fn|  red|
+---+----+-----+

I could not figure it out. Please help; I have tried following code:

val gp=jdbcDF.filter($"dept".contains("fn"))
     //.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
    gp.show()
gp.map(
  row=>{
    val row1=row.getAs[String](1)
    var row2=row.getAs[String](2)
    val make=if(row1 =="fn") row2="red"
    Row(row(0),row(1),make)
  }
).collect().foreach(println)
like image 584
senthil kumar p Avatar asked Nov 19 '16 10:11

senthil kumar p


People also ask

How do you update a column by condition in SQL?

To do a conditional update depending on whether the current value of a column matches the condition, you can add a WHERE clause which specifies this. The database will first find rows which match the WHERE clause and then only perform updates on those rows.

How do I change a column value based on conditions in pandas?

You can replace values of all or selected columns based on the condition of pandas DataFrame by using DataFrame. loc[ ] property. The loc[] is used to access a group of rows and columns by label(s) or a boolean array. It can access and can also manipulate the values of pandas DataFrame.

How do you update column values in spark?

Spark withColumn() function of the DataFrame is used to update the value of a column. withColumn() function takes 2 arguments; first the column you wanted to update and the second the value you wanted to update with. If the column name specified not found, it creates a new column with the value specified.

How do I add a column in SQL based on condition?

In Microsoft SQL Server, we can change the order of the columns and can add a new column by using ALTER command. ALTER TABLE is used to add, delete/drop or modify columns in the existing table. It is also used to add and drop various constraints on the existing table.


2 Answers

Given:

val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "fn", "color")

Do the calculation:

val redOrNot = df.groupBy("fn")
  .agg(collect_set('color) as "values")
  .withColumn("hasRed", array_contains('values, "red"))

// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
  .withColumn("resultColor", colorPicker) 
  .withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
  .select('id, 'fn, 'color)

The result looks as follows (that seems to be an answer):

scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
|  1| fn|  red|
|  2| fn|  red|
|  3| fn|  red|
|  4| aa| blue|
|  5| aa|green|
|  6| bb|  red|
|  7| bb|  red|
|  8| aa| blue|
+---+---+-----+

You can chain when operators and have a default value with otherwise. Consult the scaladoc of when operator.

I think you could do something very similar (and perhaps more efficient) using windowed operators or user-defined aggregate functions (UDAF), but...well...don't currently know how to do it. Leaving the comment here to inspire others ;-)

p.s. Learnt a lot! Thanks for the idea!

like image 146
Jacek Laskowski Avatar answered Oct 21 '22 15:10

Jacek Laskowski


Spark 2.2.0: Sample Dataframe ( taken from above examples)

    val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "dept", "color")

created a UDF to perform the update by checking the condition.

val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)

val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()

output:

enter image description here

spark 1.6:

val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

import sqlContext.implicits._
// For implicit conversions like converting RDDs to DataFrames
val df = sc.parallelize(Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
) ).toDF("id","dept","color")


val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))

final_df.show()
like image 43
Vamshavardhan Reddy Avatar answered Oct 21 '22 15:10

Vamshavardhan Reddy