Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to read multiple text files into a single RDD?

Tags:

apache-spark

People also ask

How do I read a textFile as RDD?

To read an input text file to RDD, we can use SparkContext. textFile() method. In this tutorial, we will learn the syntax of SparkContext. textFile() method, and how to use in a Spark Application to load data from a text file to RDD with the help of Java and Python examples.

How do I create an RDD in a textFile?

To create text file RDD, we can use SparkContext's textFile method. It takes URL of the file and read it as a collection of line. URL can be a local path on the machine or a hdfs://, s3n://, etc. The point to jot down is that the path of the local file system and worker node should be the same.


You can specify whole directories, use wildcards and even CSV of directories and wildcards. E.g.:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

As Nick Chammas points out this is an exposure of Hadoop's FileInputFormat and therefore this also works with Hadoop (and Scalding).


Use union as follows:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

Then the bigRdd is the RDD with all files.


You can use a single textFile call to read multiple files. Scala:

sc.textFile(','.join(files)) 

You can use this

First You can get a Buffer/List of S3 Paths :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D