I'm pretty new to spark and scala and therefore I have some questions concerning data preprocessing with spark and working with rdds. I'm working on a little project and I want to implement a machine learning system with spark. Working with the algorithms is ok I think but I have problems with preprocessing the data. I have a dataset with 30 columns and about one million rows. But for simplicity lets assume I have the following dataset (csv-file):
columnA, columnB, column_txt, label
1 , a , abc , 0
2 , , abc , 0
3 , b , abc , 1
4 , b , abc , 1
5 , a , abc , 0
6 , , abc , 0
7 , c , abc , 1
8 , a , abc , 1
9 , b , abc , 1
10 , c , abc , 0
After loading the data in spark I want to do the following steps:
So I have problems with issue 1. and 3. I know I can't remove columns so I have to create a new rdd but how do I do that without certain columns? For now I'm loading the csv file without a header in spark but for my tasks I need to. Is it recommendable to load the header in a separate rdd? But how can I interact with that rdd to find the right columns then? Sorry, I know lots of questions but I'm still at the beginning and trying to learn. Thanks and best regards, Chris
Assuming data frame is loaded with headers and structure is flat:
val df = sqlContext.
read.
format("com.databricks.spark.csv").
option("header", "true").
load("data.csv")
something like this should work:
import org.apache.spark.sql.DataFrame
def moreThan9(df: DataFrame, col: String) = {
df.agg(countDistinct(col)).first()(0) match {
case x: Long => x > 9L
case _ => false
}
}
val newDf = df.
schema. // Extract schema
toArray. // Convert to array
map(_.name). // Map to names
foldLeft(df)((df: DataFrame, col: String) => {
if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
})
If it is loaded without header then you can do the same thing using mapping from automatically the assigned ones to the actual.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With