I have the following df:
+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn| blue|
|  3|  fn|green|
+---+----+-----+
If any of the color column values is red, then I all values of the color column should be updated to be red, as below: 
+---+----+-----+
|sno|dept|color|
+---+----+-----+
|  1|  fn|  red|
|  2|  fn|  red|
|  3|  fn|  red|
+---+----+-----+
I could not figure it out. Please help; I have tried following code:
val gp=jdbcDF.filter($"dept".contains("fn"))
     //.withColumn("newone",when($"dept"==="fn","RED").otherwise("NULL"))
    gp.show()
gp.map(
  row=>{
    val row1=row.getAs[String](1)
    var row2=row.getAs[String](2)
    val make=if(row1 =="fn") row2="red"
    Row(row(0),row(1),make)
  }
).collect().foreach(println)
To do a conditional update depending on whether the current value of a column matches the condition, you can add a WHERE clause which specifies this. The database will first find rows which match the WHERE clause and then only perform updates on those rows.
You can replace values of all or selected columns based on the condition of pandas DataFrame by using DataFrame. loc[ ] property. The loc[] is used to access a group of rows and columns by label(s) or a boolean array. It can access and can also manipulate the values of pandas DataFrame.
Spark withColumn() function of the DataFrame is used to update the value of a column. withColumn() function takes 2 arguments; first the column you wanted to update and the second the value you wanted to update with. If the column name specified not found, it creates a new column with the value specified.
In Microsoft SQL Server, we can change the order of the columns and can add a new column by using ALTER command. ALTER TABLE is used to add, delete/drop or modify columns in the existing table. It is also used to add and drop various constraints on the existing table.
Given:
val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "fn", "color")
Do the calculation:
val redOrNot = df.groupBy("fn")
  .agg(collect_set('color) as "values")
  .withColumn("hasRed", array_contains('values, "red"))
// gives null for no option
val colorPicker = when('hasRed, "red")
val result = df.join(redOrNot, "fn")
  .withColumn("resultColor", colorPicker) 
  .withColumn("color", coalesce('resultColor, 'color)) // skips nulls that leads to the answer
  .select('id, 'fn, 'color)
The result looks as follows (that seems to be an answer):
scala> result.show
+---+---+-----+
| id| fn|color|
+---+---+-----+
|  1| fn|  red|
|  2| fn|  red|
|  3| fn|  red|
|  4| aa| blue|
|  5| aa|green|
|  6| bb|  red|
|  7| bb|  red|
|  8| aa| blue|
+---+---+-----+
You can chain when operators and have a default value with otherwise. Consult the scaladoc of when operator.
I think you could do something very similar (and perhaps more efficient) using windowed operators or user-defined aggregate functions (UDAF), but...well...don't currently know how to do it. Leaving the comment here to inspire others ;-)
p.s. Learnt a lot! Thanks for the idea!
Spark 2.2.0: Sample Dataframe ( taken from above examples)
    val df = Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
).toDF("id", "dept", "color")
created a UDF to perform the update by checking the condition.
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()
output:

spark 1.6:
val conf = new SparkConf().setMaster("local").setAppName("My app")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// For implicit conversions like converting RDDs to DataFrames
val df = sc.parallelize(Seq(
  (1, "fn", "red"),
  (2, "fn", "blue"),
  (3, "fn", "green"),
  (4, "aa", "blue"),
  (5, "aa", "green"),
  (6, "bb", "red"),
  (7, "bb", "red"),
  (8, "aa", "blue")
) ).toDF("id","dept","color")
val replace_val = udf((x: String,y:String) => if (Option(x).getOrElse("").equalsIgnoreCase("fn")&&(!y.equalsIgnoreCase("red"))) "red" else y)
val final_df = df.withColumn("color", replace_val($"dept",$"color"))
final_df.show()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With