Just to illustrate the problem I have taken a testset csv file. But in real case scenario, the problem has to handle more than a TeraByte data.
I have a CSV file, where the columns are enclosed by quotes("col1"). But when the data import was done. One column contains new line character(\n). This is leading me to lot of problems, when I want to save them as Hive tables.
My idea was to replace the \n character with "|" pipe in spark.
I achieved so far :
1. val test = sqlContext.load(
"com.databricks.spark.csv",
Map("path" -> "test_set.csv", "header" -> "true", "inferSchema" -> "true", "delimiter" -> "," , "quote" -> "\"", "escape" -> "\\" ,"parserLib" -> "univocity" ))#read a csv file
2. val dataframe = test.toDF() #convert to dataframe
3. dataframe.foreach(println) #print
4. dataframe.map(row => {
val row4 = row.getAs[String](4)
val make = row4.replaceAll("[\r\n]", "|")
(make)
}).collect().foreach(println) #replace not working for me
Sample set :
(17 , D73 ,525, 1 ,testing\n , 90 ,20.07.2011 ,null ,F10 , R)
(17 , D73 ,526, 1 ,null , 89 ,20.07.2011 ,null ,F10 , R)
(17 , D73 ,529, 1 ,once \n again, 10 ,20.07.2011 ,null ,F10 , R)
(17 , D73 ,531, 1 ,test3\n , 10 ,20.07.2011 ,null ,F10 , R)
Expected result set :
(17 , D73 ,525, 1 ,testing| , 90 ,20.07.2011 ,null ,F10 , R)
(17 , D73 ,526, 1 ,null , 89 ,20.07.2011 ,null ,F10 , R)
(17 , D73 ,529, 1 ,once | again, 10 ,20.07.2011 ,null ,F10 , R)
(17 , D73 ,531, 1 ,test3| , 10 ,20.07.2011 ,null ,F10 , R)
what worked for me:
val rep = "\n123\n Main Street\n".replaceAll("[\\r\\n]", "|") rep: String = |123| Main Street|
but why I am not able to do on Tuple basis?
val dataRDD = lines_wo_header.map(line => line.split(";")).map(row => (row(0).toLong, row(1).toString,
row(2).toLong, row(3).toLong,
row(4).toString, row(5).toLong,
row(6).toString, row(7).toString, row(8).toString,row(9).toString))
dataRDD.map(row => {
val wert = row._5.replaceAll("[\\r\\n]", "|")
(row._1,row._2,row._3,row._4,wert,row._6, row._7,row._8,row._9,row._10)
}).collect().foreach(println)
Spark --version 1.3.1
By using regexp_replace() Spark function you can replace a column's string value with another string/substring. regexp_replace() uses Java regex for matching, if the regex does not match it returns an empty string.
Use ` to escape special characters (e.g., ` ).
To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.
inferSchema -> Infer schema will automatically guess the data types for each field. If we set this option to TRUE, the API will read some sample records from the file to infer the schema. If we want to set this value to false, we must specify a schema explicitly.
If you can use Spark SQL 1.5 or higher, you may consider using the functions available for columns. Assuming you don't know (or don't have) the names for the columns, you can do as in the following snippet:
val df = test.toDF()
import org.apache.spark.sql.functions._
val newDF = df.withColumn(df.columns(4), regexp_replace(col(df.columns(4)), "[\\r\\n]", "|"))
If you know the name of the column, you can replace df.columns(4)
by its name in both occurences.
I hope that helps. Cheers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With