Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Data preprocessing with apache spark and scala

I'm pretty new to spark and scala and therefore I have some questions concerning data preprocessing with spark and working with rdds. I'm working on a little project and I want to implement a machine learning system with spark. Working with the algorithms is ok I think but I have problems with preprocessing the data. I have a dataset with 30 columns and about one million rows. But for simplicity lets assume I have the following dataset (csv-file):

columnA, columnB, column_txt, label
1      , a      , abc       , 0
2      ,        , abc       , 0
3      , b      , abc       , 1
4      , b      , abc       , 1
5      , a      , abc       , 0
6      ,        , abc       , 0
7      , c      , abc       , 1
8      , a      , abc       , 1
9      , b      , abc       , 1
10     , c      , abc       , 0

After loading the data in spark I want to do the following steps:

  1. Remove all columns that end with "_txt"
  2. Filter out all rows where columnB is empty (this I figured out already)
  3. Delete those columns that have more than 9 levels (here columnA)

So I have problems with issue 1. and 3. I know I can't remove columns so I have to create a new rdd but how do I do that without certain columns? For now I'm loading the csv file without a header in spark but for my tasks I need to. Is it recommendable to load the header in a separate rdd? But how can I interact with that rdd to find the right columns then? Sorry, I know lots of questions but I'm still at the beginning and trying to learn. Thanks and best regards, Chris

like image 351
csnr Avatar asked Jun 20 '26 12:06

csnr


1 Answers

Assuming data frame is loaded with headers and structure is flat:

val df = sqlContext.
    read.
    format("com.databricks.spark.csv").
    option("header", "true").
    load("data.csv")

something like this should work:

import org.apache.spark.sql.DataFrame

def moreThan9(df: DataFrame, col: String) = {
    df.agg(countDistinct(col)).first()(0) match {
        case x: Long => x > 9L
        case _ => false
    }
}

val newDf = df.
    schema. //  Extract schema
    toArray. // Convert to array
    map(_.name). // Map to names
    foldLeft(df)((df: DataFrame, col: String) => {
        if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
    })

If it is loaded without header then you can do the same thing using mapping from automatically the assigned ones to the actual.

like image 110
zero323 Avatar answered Jun 23 '26 07:06

zero323