I have multiple spark jobs which reading from different sources, they have different schema but they are very close, what I want to do is to write all of them to the same Redshift table so for that I need to unify all the DataFrame schema, what is the best way to do it?
let's say schema for the first input data is like:
val schema1 = StructType(Seq(
StructField("date", DateType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("platform", StringType),
StructField("country", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType)
))
And Schema of seconf inout source is like:
val schema2 = StructType(Seq(
StructField("date", DateType),
StructField("creator_id", StringType),
StructField("creator_name", StringType),
StructField("platform", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType),
StructField("ecpm", DoubleType)
))
The Table Schema (expected Unify DataFrame):
val finalSchema = StructType(Seq(
StructField("date", DateType),
StructField("account_name", StringType),
StructField("adset_id", StringType),
StructField("adset_name", StringType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("pub_id", StringType),
StructField("pub_name", StringType),
StructField("creative_id", StringType),
StructField("creative_name", StringType),
StructField("platform", StringType),
StructField("install_source", StringType),
StructField("views", IntegerType),
StructField("clicks", IntegerType),
StructField("installs", IntegerType),
StructField("cost", DoubleType)
))
As you can see in the final schema I have some columns that might not be in the input schema so it should be null, some columns name also should get renamed. And some of the columns like ecpm should get dropped.
Add index columns to both the dataframes and join them based on index so there will be one-to-one mapping. After that select only your desired columns from joined dataframe.
If you have two dataframes like below
// df1.show
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
| Bob| 29|
| Tom| 26|
+-----+---+
//df2.show
+--------+-------+
| city|country|
+--------+-------+
| Delhi| India|
|New York| USA|
| London| UK|
+--------+-------+
Now add index columns and get one-to-one mapping
import org.apache.spark.sql.functions._
val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
//joinedDf
+-----+---+------+--------+-------+------+
| name|age|index1| city|country|index2|
+-----+---+------+--------+-------+------+
|Alice| 25| 0| Delhi| India| 0|
| Bob| 29| 1|New York| USA| 1|
| Tom| 26| 2| London| UK| 2|
+-----+---+------+--------+-------+------+
Now you can write you query like below
val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show
//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25| India|
| Bob| 29| USA|
| Tom| 26| UK|
+-----+---+-------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With