I'd like to infer a Spark.DataFrame schema from a directory of CSV files using a small subset of the rows (say limit(100)
).
However, setting inferSchema
to True
means that the Input Size / Records
for the FileScanRDD
seems to always be equal to the number of rows in all the CSV files.
Is there a way to make the FileScan more selective, such that Spark looks at fewer rows when inferring a schema?
Note: setting the samplingRatio
option to be < 1.0 does not have the desired behaviour, though it is clear that inferSchema uses only the sampled subset of rows.
You could read a subset of your input data into a dataSet of String. The CSV method allows you to pass this as a parameter.
Here is a simple example (I'll leave reading the sample of rows from the input file to you):
val data = List("1,2,hello", "2,3,what's up?")
val csvRDD = sc.parallelize(data)
val df = spark.read.option("inferSchema","true").csv(csvRDD.toDS)
df.schema
When run in spark-shell, the final line from the above prints (I reformatted it for readability):
res4: org.apache.spark.sql.types.StructType =
StructType(
StructField(_c0,IntegerType,true),
StructField(_c1,IntegerType,true),
StructField(_c2,StringType,true)
)
Which is the correct Schema for my limited input data set.
Assuming you are only interested in the schema, here is a possible approach based on cipri.l's post in this link
import org.apache.spark.sql.execution.datasources.csv.{CSVOptions, TextInputCSVDataSource}
def inferSchemaFromSample(sparkSession: SparkSession, fileLocation: String, sampleSize: Int, isFirstRowHeader: Boolean): StructType = {
// Build a Dataset composed of the first sampleSize lines from the input files as plain text strings
val dataSample: Array[String] = sparkSession.read.textFile(fileLocation).head(sampleSize)
import sparkSession.implicits._
val sampleDS: Dataset[String] = sparkSession.createDataset(dataSample)
// Provide information about the CSV files' structure
val firstLine = dataSample.head
val extraOptions = Map("inferSchema" -> "true", "header" -> isFirstRowHeader.toString)
val csvOptions: CSVOptions = new CSVOptions(extraOptions, sparkSession.sessionState.conf.sessionLocalTimeZone)
// Infer the CSV schema based on the sample data
val schema = TextInputCSVDataSource.inferFromDataset(sparkSession, sampleDS, Some(firstLine), csvOptions)
schema
}
Unlike GMc's answer from above, this approach tries to directly infer the schema the same way the DataFrameReader.csv() does in the background (but without going through the effort of building an additional Dataset with that schema, that we would then only use to retrieve the schema back from it)
The schema is inferred based on a Dataset[String] containing only the first sampleSize
lines from the input files as plain text strings.
When trying to retrieve samples from data, Spark has only 2 types of methods:
Since you mentioned you want to use a specific small number of rows and since you want to avoid touching all the data, I provided a solution based on option 2
PS: The DataFrameReader.textFile method accepts paths to files, folders and it also has a varargs variant, so you could pass in one or more files or folders.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With