Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use sqlContext to load multiple parquet files?

I'm trying to load a directory of parquet files in spark but can't seem to get it to work...this seems to work:

val df = sqlContext.load("hdfs://nameservice1/data/rtl/events/stream/loaddate=20151102")

but this doesn't work:

val df = sqlContext.load("hdfs://nameservice1/data/rtl/events/stream/loaddate=201511*")

it gives me back this error:

java.io.FileNotFoundException: File does not exist: hdfs://nameservice1/data/rtl/events/stream/loaddate=201511*

how do i get it to work with a wild card?

like image 650
lightweight Avatar asked Jan 07 '23 03:01

lightweight


2 Answers

you can read in the list of files or folders using the filesystem list status. Then go over the files/folders you want to read. Use a reduce with union to reduce all files into one single rdd.

Get the files/folders:

val fs = FileSystem.get(new Configuration())
val status = fs.listStatus(new Path(YOUR_HDFS_PATH))

Read in the data:

val parquetFiles= status .map(folder => {
    sqlContext.read.parquet(folder.getPath.toString)
  })

Merge the data into single rdd:

val mergedFile= parquetFiles.reduce((x, y) => x.unionAll(y))

You can also have a look at my past posts around the same topic.

Spark Scala list folders in directory

Spark/Scala flatten and flatMap is not working on DataFrame

like image 86
AlexL Avatar answered Jan 10 '23 20:01

AlexL


If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.

like:

basePath="hdfs://nameservice1/data/rtl/events/stream"

sparkSession.read.option("basePath", basePath).parquet(basePath + "loaddate=201511*")
like image 37
bruse Avatar answered Jan 10 '23 18:01

bruse