Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Replace new line (\n) character in csv file - spark scala

Just to illustrate the problem I have taken a testset csv file. But in real case scenario, the problem has to handle more than a TeraByte data.

I have a CSV file, where the columns are enclosed by quotes("col1"). But when the data import was done. One column contains new line character(\n). This is leading me to lot of problems, when I want to save them as Hive tables.

My idea was to replace the \n character with "|" pipe in spark.

I achieved so far :

1. val test = sqlContext.load(
        "com.databricks.spark.csv",
        Map("path" -> "test_set.csv", "header" -> "true", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\\" ,"parserLib" -> "univocity" ))#read a csv file

 2.   val dataframe = test.toDF() #convert to dataframe

  3.    dataframe.foreach(println) #print

    4. dataframe.map(row => {
        val row4 = row.getAs[String](4)
        val make = row4.replaceAll("[\r\n]", "|") 
        (make)
      }).collect().foreach(println) #replace not working for me

Sample set :

(17 , D73 ,525, 1  ,testing\n    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once \n again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3\n      ,  10 ,20.07.2011 ,null ,F10 , R)

Expected result set :

(17 , D73 ,525, 1  ,testing|    ,  90 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,526, 1  ,null         ,  89 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,529, 1  ,once | again,  10 ,20.07.2011 ,null ,F10 , R)
 (17 , D73 ,531, 1  ,test3|      ,  10 ,20.07.2011 ,null ,F10 , R)

what worked for me:

val rep = "\n123\n Main Street\n".replaceAll("[\\r\\n]", "|") rep: String = |123| Main Street|

but why I am not able to do on Tuple basis?

 val dataRDD = lines_wo_header.map(line => line.split(";")).map(row => (row(0).toLong, row(1).toString, 
                                               row(2).toLong, row(3).toLong, 
                                               row(4).toString, row(5).toLong,
                                               row(6).toString, row(7).toString, row(8).toString,row(9).toString)) 

dataRDD.map(row => {
                val wert = row._5.replaceAll("[\\r\\n]", "|") 
                (row._1,row._2,row._3,row._4,wert,row._6, row._7,row._8,row._9,row._10)
                }).collect().foreach(println)

Spark --version 1.3.1

like image 858
user3560220 Avatar asked May 02 '16 19:05

user3560220


People also ask

How do I replace a string in a DataFrame Spark?

By using regexp_replace() Spark function you can replace a column's string value with another string/substring. regexp_replace() uses Java regex for matching, if the regex does not match it returns an empty string.

How do you escape special characters in Spark SQL?

Use ` to escape special characters (e.g., ` ).

How do I change the data type in Spark?

To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.

What is inferSchema?

inferSchema -> Infer schema will automatically guess the data types for each field. If we set this option to TRUE, the API will read some sample records from the file to infer the schema. If we want to set this value to false, we must specify a schema explicitly.


1 Answers

If you can use Spark SQL 1.5 or higher, you may consider using the functions available for columns. Assuming you don't know (or don't have) the names for the columns, you can do as in the following snippet:

val df = test.toDF()

import org.apache.spark.sql.functions._
val newDF = df.withColumn(df.columns(4), regexp_replace(col(df.columns(4)), "[\\r\\n]", "|"))

If you know the name of the column, you can replace df.columns(4) by its name in both occurences.

I hope that helps. Cheers.

like image 71
Daniel de Paula Avatar answered Sep 21 '22 02:09

Daniel de Paula