Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

add column from one dataframe to another dataframe in scala [duplicate]

I have two DataFrame with same number of row, but number of column is different and dynamic according to source.

First DataFrame contains all columns, but the second DataFrame is filtered and processed which don't have all other.

Need to pick specific column from first DataFrame and add/merge with second DataFrame.

val sourceDf = spark.read.load(parquetFilePath)
val resultDf = spark.read.load(resultFilePath)

val columnName :String="Col1"

I tried to add in several ways, here i am just giving few one....

val modifiedResult = resultDf.withColumn(columnName, sourceDf.col(columnName))

val modifiedResult = resultDf.withColumn(columnName, sourceDf(columnName))
val modifiedResult = resultDf.withColumn(columnName, labelColumnUdf(sourceDf.col(columnName)))

None of these are working.

Can you please help me on this to merge/add column to the 2nd DataFrame from 1st DataFrame.

Given example are not the exact data structure that i need, but it will fulfill my requirement to resolve this issue.

Sample Input Output:

Source DataFrame:
+---+------+---+
|InputGas|
+---+------+---+
|1000|
|2000|
|3000|
|4000|
+---+------+---+

Result DataFrame:
+---+------+---+
| Time|CalcGas|Speed|
+---+------+---+
|  0 | 111| 1111|
|  0 | 222| 2222|
|  1 | 333| 3333|
|  2 | 444| 4444|
+---+------+---+

Expected Output:
+---+------+---+
|Time|CalcGas|Speed|InputGas|
+---+------+---+---+
|  0|111 | 1111 |1000|
|  0|222 | 2222 |2000|
|  1|333 | 3333 |3000|
|  2|444 | 4444 |4000|
+---+------+---+---+
like image 797
Yousuf Zaman Avatar asked Oct 31 '17 05:10

Yousuf Zaman


1 Answers

one way to achieve this using join

In case if you have some common column in both the dataframes then you can perform join on that column and get your desire result.

Example:

import sparkSession.sqlContext.implicits._

val df1 = Seq((1, "Anu"),(2, "Suresh"),(3, "Usha"), (4, "Nisha")).toDF("id","name")
val df2 = Seq((1, 23),(2, 24),(3, 24), (4, 25), (5, 30), (6, 32)).toDF("id","age")

val df = df1.as("df1").join(df2.as("df2"), df1("id") === df2("id")).select("df1.id", "df1.name", "df2.age")
df.show()

Output:

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|   Anu| 23|
|  2|Suresh| 24|
|  3|  Usha| 24|
|  4| Nisha| 25|
+---+------+---+

Update:

In case if you don't have any unique id common in both dataframes, then create one and use it.

import sparkSession.sqlContext.implicits._
import org.apache.spark.sql.functions._

var sourceDf = Seq(1000, 2000, 3000, 4000).toDF("InputGas")
var resultDf  = Seq((0, 111, 1111), (0, 222, 2222), (1, 333, 3333), (2, 444, 4444)).toDF("Time", "CalcGas", "Speed")

sourceDf = sourceDf.withColumn("rowId1", monotonically_increasing_id())
resultDf = resultDf.withColumn("rowId2", monotonically_increasing_id())

val df = sourceDf.as("df1").join(resultDf.as("df2"), sourceDf("rowId1") === resultDf("rowId2"), "inner").select("df1.InputGas", "df2.Time", "df2.CalcGas", "df2.Speed")
df.show()

Output:

+--------+----+-------+-----+
|InputGas|Time|CalcGas|Speed|
+--------+----+-------+-----+
|    1000|   0|    111| 1111|
|    2000|   0|    222| 2222|
|    3000|   1|    333| 3333|
|    4000|   2|    444| 4444|
+--------+----+-------+-----+
like image 199
Prasad Khode Avatar answered Nov 08 '22 23:11

Prasad Khode