Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to process multi line input records in Spark

I have each record spread across multiple lines in the input file(Very huge file).

Ex:

Id:   2
ASIN: 0738700123
  title: Test tile for this product
  group: Book
  salesrank: 168501
  similar: 5  0738700811  1567184912  1567182813  0738700514  0738700915
  categories: 2
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Wicca[12484]
   |Books[283155]|Subjects[1000]|Religion & Spirituality[22]|Earth-Based Religions[12472]|Witchcraft[12486]
  reviews: total: 12  downloaded: 12  avg rating: 4.5
    2001-12-16  cutomer: A11NCO6YTE4BTJ  rating: 5  votes:   5  helpful:   4
    2002-1-7  cutomer:  A9CQ3PLRNIR83  rating: 4  votes:   5  helpful:   5

How to identify and process each multi line record in spark?

like image 604
Vijay Innamuri Avatar asked Dec 18 '14 07:12

Vijay Innamuri


People also ask

How do I write multiple lines in spark SQL?

You can use triple-quotes at the start/end of the SQL code or a backslash at the end of each line. Triple quotes (both double and single) can be used in Python as well. Also backslashes are obsolete.

What is multiline option in spark?

Spark JSON data source API provides the multiline option to read records from multiple lines. By default, spark considers every record in a JSON file as a fully qualified record in a single line hence, we need to use the multiline option to process JSON from multiple lines.

How do you write multiple lines in PySpark?

You can use either backslash or parenthesis to break the lines in pyspark as you do in python.


1 Answers

If the multi-line data has a defined record separator, you could use the hadoop support for multi-line records, providing the separator through a hadoop.Configuration object:

Something like this should do:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "id:")
val dataset = sc.newAPIHadoopFile("/path/to/data", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val data = dataset.map(x=>x._2.toString)

This will provide you with an RDD[String] where each element corresponds to a record. Afterwards you need to parse each record following your application requirements.

like image 93
maasg Avatar answered Sep 21 '22 15:09

maasg