So i was trying to load the csv file inferring custom schema but everytime i end up with the following errors:
Project_Bank.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [110, 111, 13, 10]
This is how my program looks like and my csv file entries ,
age;job;marital;education;default;balance;housing;loan;contact;day;month;duration;campaign;pdays;previous;poutcome;y 58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no 44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no 33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
$spark-shell --packages com.databricks:spark-csv_2.10:1.5.0
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val bankSchema = StructType(Array(
StructField("age", IntegerType, true),
StructField("job", StringType, true),
StructField("marital", StringType, true),
StructField("education", StringType, true),
StructField("default", StringType, true),
StructField("balance", IntegerType, true),
StructField("housing", StringType, true),
StructField("loan", StringType, true),
StructField("contact", StringType, true),
StructField("day", IntegerType, true),
StructField("month", StringType, true),
StructField("duration", IntegerType, true),
StructField("campaign", IntegerType, true),
StructField("pdays", IntegerType, true),
StructField("previous", IntegerType, true),
StructField("poutcome", StringType, true),
StructField("y", StringType, true)))
val df = sqlContext.
read.
schema(bankSchema).
option("header", "true").
option("delimiter", ";").
load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()
df.registerTempTable("people")
df.printSchema()
val distinctage = sqlContext.sql("select distinct age from people")
Any suggestion as why am not able to work with the csv file here after pushing the correct schema. Thanks in advance for your advise.
Thanks Amit K
The parquet file format contains a 4-byte magic number in the header (PAR1) and at the end of the footer. This is a magic number indicates that the file is in parquet format. All the file metadata is stored in the footer section.
Parquet files are composed of row groups, header and footer. Each row group contains data from the same columns. The same columns are stored together in each row group: This structure is well-optimized both for fast query performance, as well as low I/O (minimizing the amount of data scanned).
Here the problem is Data Frame expects Parquet file while processing it. In order to handle data in CSV. Here what you can do.
First of all, remove the header row from the data.
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
Next we write following code to read the data.
Create case class
case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)
Read data from HDFS and parse it
val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()
And then register table and execute queries.
bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")
Here is what the output would look like
+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+
Here the expected file format is csv
but as per error its looking for parquet
file format.
This can be overcome by explicitly mentioning the file format as below (which was missing in the problem shared) because if we don't specify the file format then it by default expects Parquet
format.
As per Java code version (sample example):
Dataset<Row> resultData = session.read().format("csv")
.option("sep", ",")
.option("header", true)
.option("mode", "DROPMALFORMED")
.schema(definedSchema)
.load(inputPath);
Here, schema can be defined either by using a java class (ie. POJO class)
or by using StructType
as already mentioned.
And inputPath is the path of input csv
file.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With