I have multiple spark jobs which reading from different sources, they have different schema but they are very close, what I want to do is to write all of them to the same Redshift table so for that I need to unify all the DataFrame schema, what is the best way to do it?
let's say schema for the first input data is like:
val schema1 = StructType(Seq(
StructField("date", DateType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("platform", StringType),
StructField("country", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType)
))
And Schema of seconf inout source is like:
val schema2 = StructType(Seq(
StructField("date", DateType),
StructField("creator_id", StringType),
StructField("creator_name", StringType),
StructField("platform", StringType),
StructField("views", DoubleType),
StructField("installs", DoubleType),
StructField("spend", DoubleType),
StructField("ecpm", DoubleType)
))
The Table Schema (expected Unify DataFrame):
val finalSchema = StructType(Seq(
StructField("date", DateType),
StructField("account_name", StringType),
StructField("adset_id", StringType),
StructField("adset_name", StringType),
StructField("campaign_id", StringType),
StructField("campaign_name", StringType),
StructField("pub_id", StringType),
StructField("pub_name", StringType),
StructField("creative_id", StringType),
StructField("creative_name", StringType),
StructField("platform", StringType),
StructField("install_source", StringType),
StructField("views", IntegerType),
StructField("clicks", IntegerType),
StructField("installs", IntegerType),
StructField("cost", DoubleType)
))
As you can see in the final schema I have some columns that might not be in the input schema so it should be null, some columns name also should get renamed. And some of the columns like ecpm
should get dropped.
Add index
columns
to both the dataframes
and join
them based on index
so there will be one-to-one mapping. After that select
only your desired columns
from joined
dataframe
.
If you have two dataframes
like below
// df1.show
+-----+---+
| name|age|
+-----+---+
|Alice| 25|
| Bob| 29|
| Tom| 26|
+-----+---+
//df2.show
+--------+-------+
| city|country|
+--------+-------+
| Delhi| India|
|New York| USA|
| London| UK|
+--------+-------+
Now add index
columns
and get one-to-one mapping
import org.apache.spark.sql.functions._
val df1Index=df1.withColumn("index1",monotonicallyIncreasingId)
val df2Index=df2.withColumn("index2",monotonicallyIncreasingId)
val joinedDf=df1Index.join(df2Index,df1Index("index1")===df2Index("index2"))
//joinedDf
+-----+---+------+--------+-------+------+
| name|age|index1| city|country|index2|
+-----+---+------+--------+-------+------+
|Alice| 25| 0| Delhi| India| 0|
| Bob| 29| 1|New York| USA| 1|
| Tom| 26| 2| London| UK| 2|
+-----+---+------+--------+-------+------+
Now you can write you query like below
val queryList=List(col("name"),col("age"),col("country"))
joinedDf.select(queryList:_*).show
//Output df
+-----+---+-------+
| name|age|country|
+-----+---+-------+
|Alice| 25| India|
| Bob| 29| USA|
| Tom| 26| UK|
+-----+---+-------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With