I am trying to read a csv file into a dataframe. I know what the schema of my dataframe should be since I know my csv file. Also I am using spark csv package to read the file. I trying to specify the schema like below.
val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("schema","project: string ,article: string ,requests: integer ,bytes_served: long") .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
But when I check the schema of the data frame I created, it seems to have taken its own schema. Am I doing anything wrong ? how to make spark to pick up the schema I mentioned ?
> pagecount.printSchema root |-- _c0: string (nullable = true) |-- _c1: string (nullable = true) |-- _c2: string (nullable = true) |-- _c3: string (nullable = true)
We can create a DataFrame programmatically using the following three steps. Create an RDD of Rows from an Original RDD. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.
Apache PySpark provides the "csv("path")" for reading a CSV file into the Spark DataFrame and the "dataframeObj. write. csv("path")" for saving or writing to the CSV file. The Apache PySpark supports reading the pipe, comma, tab, and other delimiters/separator files.
Try the below code, you need not specify the schema. When you give inferSchema as true it should take it from your csv file.
val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("header", "true") .option("inferSchema", "true") .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
If you want to manually specify the schema, you can do it as below:
import org.apache.spark.sql.types._ val customSchema = StructType(Array( StructField("project", StringType, true), StructField("article", StringType, true), StructField("requests", IntegerType, true), StructField("bytes_served", DoubleType, true)) ) val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("header", "true") .schema(customSchema) .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
I'm using the solution provided by Arunakiran Nulu in my analysis (see the code). Despite it is able to assign the correct types to the columns, all the values returned are null
. Previously, I've tried to the option .option("inferSchema", "true")
and it returns the correct values in the dataframe (although different type).
val customSchema = StructType(Array( StructField("numicu", StringType, true), StructField("fecha_solicitud", TimestampType, true), StructField("codtecnica", StringType, true), StructField("tecnica", StringType, true), StructField("finexploracion", TimestampType, true), StructField("ultimavalidacioninforme", TimestampType, true), StructField("validador", StringType, true))) val df_explo = spark.read .format("csv") .option("header", "true") .option("delimiter", "\t") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") .schema(customSchema) .load(filename)
Result
root |-- numicu: string (nullable = true) |-- fecha_solicitud: timestamp (nullable = true) |-- codtecnica: string (nullable = true) |-- tecnica: string (nullable = true) |-- finexploracion: timestamp (nullable = true) |-- ultimavalidacioninforme: timestamp (nullable = true) |-- validador: string (nullable = true)
and the table is:
|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador| +------+---------------+----------+-------+--------------+-----------------------+---------+ | null| null| null| null| null| null| null| | null| null| null| null| null| null| null| | null| null| null| null| null| null| null| | null| null| null| null| null| null| null|
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With