Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark infer schema with limit during a read.csv

Tags:

apache-spark

I'd like to infer a Spark.DataFrame schema from a directory of CSV files using a small subset of the rows (say limit(100)).

However, setting inferSchema to True means that the Input Size / Records for the FileScanRDD seems to always be equal to the number of rows in all the CSV files.

Is there a way to make the FileScan more selective, such that Spark looks at fewer rows when inferring a schema?

Note: setting the samplingRatio option to be < 1.0 does not have the desired behaviour, though it is clear that inferSchema uses only the sampled subset of rows.

like image 489
Jedi Avatar asked Mar 04 '23 02:03

Jedi


2 Answers

You could read a subset of your input data into a dataSet of String. The CSV method allows you to pass this as a parameter.

Here is a simple example (I'll leave reading the sample of rows from the input file to you):

val data = List("1,2,hello", "2,3,what's up?")
val csvRDD = sc.parallelize(data)
val df = spark.read.option("inferSchema","true").csv(csvRDD.toDS)
df.schema

When run in spark-shell, the final line from the above prints (I reformatted it for readability):

res4: org.apache.spark.sql.types.StructType = 
    StructType(
      StructField(_c0,IntegerType,true),
      StructField(_c1,IntegerType,true),
      StructField(_c2,StringType,true)
    )

Which is the correct Schema for my limited input data set.

like image 79
GMc Avatar answered Mar 27 '23 19:03

GMc


Assuming you are only interested in the schema, here is a possible approach based on cipri.l's post in this link

import org.apache.spark.sql.execution.datasources.csv.{CSVOptions, TextInputCSVDataSource}
def inferSchemaFromSample(sparkSession: SparkSession, fileLocation: String, sampleSize: Int, isFirstRowHeader: Boolean): StructType = {
  // Build a Dataset composed of the first sampleSize lines from the input files as plain text strings
  val dataSample: Array[String] = sparkSession.read.textFile(fileLocation).head(sampleSize)
  import sparkSession.implicits._
  val sampleDS: Dataset[String] = sparkSession.createDataset(dataSample)
  // Provide information about the CSV files' structure
  val firstLine = dataSample.head
  val extraOptions = Map("inferSchema" -> "true",   "header" -> isFirstRowHeader.toString)
  val csvOptions: CSVOptions = new CSVOptions(extraOptions, sparkSession.sessionState.conf.sessionLocalTimeZone)
  // Infer the CSV schema based on the sample data
  val schema = TextInputCSVDataSource.inferFromDataset(sparkSession, sampleDS, Some(firstLine), csvOptions)
  schema
}

Unlike GMc's answer from above, this approach tries to directly infer the schema the same way the DataFrameReader.csv() does in the background (but without going through the effort of building an additional Dataset with that schema, that we would then only use to retrieve the schema back from it)

The schema is inferred based on a Dataset[String] containing only the first sampleSize lines from the input files as plain text strings.

When trying to retrieve samples from data, Spark has only 2 types of methods:

  1. Methods that retrieve a given percentage of the data. This operation takes random samples from all partitions. It benefits from higher parallelism, but it must read all the input files.
  2. Methods that retrieve a specific number of rows. This operation must collect the data on the driver, but it could read a single partition (if the required row count is low enough)

Since you mentioned you want to use a specific small number of rows and since you want to avoid touching all the data, I provided a solution based on option 2

PS: The DataFrameReader.textFile method accepts paths to files, folders and it also has a varargs variant, so you could pass in one or more files or folders.

like image 22
SilviuC Avatar answered Mar 27 '23 19:03

SilviuC