I have two DataFrame with same number of row, but number of column is different and dynamic according to source.
First DataFrame contains all columns, but the second DataFrame is filtered and processed which don't have all other.
Need to pick specific column from first DataFrame and add/merge with second DataFrame.
val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)
val columnName :String="Col1"
I tried to add in several ways, here i am just giving few one....
val modifiedResult = resultDf.withColumn(columnName, sourceDf.col(columnName))
val modifiedResult = resultDf.withColumn(columnName, sourceDf(columnName))
val modifiedResult = resultDf.withColumn(columnName, labelColumnUdf(sourceDf.col(columnName)))
None of these are working.
Can you please help me on this to merge/add column to the 2nd DataFrame from 1st DataFrame.
Given example are not the exact data structure that i need, but it will fulfill my requirement to resolve this issue.
Sample Input Output:
Source DataFrame:
+---+------+---+
|InputGas|
+---+------+---+
|1000|
|2000|
|3000|
|4000|
+---+------+---+
Result DataFrame:
+---+------+---+
| Time|CalcGas|Speed|
+---+------+---+
| 0 | 111| 1111|
| 0 | 222| 2222|
| 1 | 333| 3333|
| 2 | 444| 4444|
+---+------+---+
Expected Output:
+---+------+---+
|Time|CalcGas|Speed|InputGas|
+---+------+---+---+
| 0|111 | 1111 |1000|
| 0|222 | 2222 |2000|
| 1|333 | 3333 |3000|
| 2|444 | 4444 |4000|
+---+------+---+---+
one way to achieve this using join
In case if you have some common column in both the dataframes then you can perform join on that column and get your desire result.
Example:
import sparkSession.sqlContext.implicits._
val df1 = Seq((1, "Anu"),(2, "Suresh"),(3, "Usha"), (4, "Nisha")).toDF("id","name")
val df2 = Seq((1, 23),(2, 24),(3, 24), (4, 25), (5, 30), (6, 32)).toDF("id","age")
val df = df1.as("df1").join(df2.as("df2"), df1("id") === df2("id")).select("df1.id", "df1.name", "df2.age")
df.show()
Output:
+---+------+---+
| id| name|age|
+---+------+---+
| 1| Anu| 23|
| 2|Suresh| 24|
| 3| Usha| 24|
| 4| Nisha| 25|
+---+------+---+
In case if you don't have any unique id common in both dataframes, then create one and use it.
import sparkSession.sqlContext.implicits._
import org.apache.spark.sql.functions._
var sourceDf = Seq(1000, 2000, 3000, 4000).toDF("InputGas")
var resultDf = Seq((0, 111, 1111), (0, 222, 2222), (1, 333, 3333), (2, 444, 4444)).toDF("Time", "CalcGas", "Speed")
sourceDf = sourceDf.withColumn("rowId1", monotonically_increasing_id())
resultDf = resultDf.withColumn("rowId2", monotonically_increasing_id())
val df = sourceDf.as("df1").join(resultDf.as("df2"), sourceDf("rowId1") === resultDf("rowId2"), "inner").select("df1.InputGas", "df2.Time", "df2.CalcGas", "df2.Speed")
df.show()
Output:
+--------+----+-------+-----+
|InputGas|Time|CalcGas|Speed|
+--------+----+-------+-----+
| 1000| 0| 111| 1111|
| 2000| 0| 222| 2222|
| 3000| 1| 333| 3333|
| 4000| 2| 444| 4444|
+--------+----+-------+-----+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With