I have a smallish dataset that will be the result of a Spark job. I am thinking about converting this dataset to a dataframe for convenience at the end of the job, but have struggled to correctly define the schema. The problem is the last field below (topValues
); it is an ArrayBuffer of tuples -- keys and counts.
val innerSchema =
StructType(
Array(
StructField("value", StringType),
StructField("count", LongType)
)
)
val outputSchema =
StructType(
Array(
StructField("name", StringType, nullable=false),
StructField("index", IntegerType, nullable=false),
StructField("count", LongType, nullable=false),
StructField("empties", LongType, nullable=false),
StructField("nulls", LongType, nullable=false),
StructField("uniqueValues", LongType, nullable=false),
StructField("mean", DoubleType),
StructField("min", DoubleType),
StructField("max", DoubleType),
StructField("topValues", innerSchema)
)
)
val result = stats.columnStats.map{ c =>
Row(c._2.name, c._1, c._2.count, c._2.empties, c._2.nulls, c._2.uniqueValues, c._2.mean, c._2.min, c._2.max, c._2.topValues.topN)
}
val rdd = sc.parallelize(result.toSeq)
val outputDf = sqlContext.createDataFrame(rdd, outputSchema)
outputDf.show()
The error I'm getting is a MatchError: scala.MatchError: ArrayBuffer((10,2), (20,3), (8,1)) (of class scala.collection.mutable.ArrayBuffer)
When I debug and inspect my objects, I'm seeing this:
rdd: ParallelCollectionRDD[2]
rdd.data: "ArrayBuffer" size = 2
rdd.data(0): [age,2,6,0,0,3,14.666666666666666,8.0,20.0,ArrayBuffer((10,2), (20,3), (8,1))]
rdd.data(1): [gender,3,6,0,0,2,0.0,0.0,0.0,ArrayBuffer((M,4), (F,2))]
It seems to me that I've accurately described the ArrayBuffer of tuples in my innerSchema, but Spark disagrees.
Any idea how I should be defining the schema?
val rdd = sc.parallelize(Array(Row(ArrayBuffer(1,2,3,4))))
val df = sqlContext.createDataFrame(
rdd,
StructType(Seq(StructField("arr", ArrayType(IntegerType, false), false)
)
df.printSchema
root
|-- arr: array (nullable = false)
| |-- element: integer (containsNull = false)
df.show
+------------+
| arr|
+------------+
|[1, 2, 3, 4]|
+------------+
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val searchPath = "/path/to/.csv"
val columns = "col1,col2,col3,col4,col5,col6,col7"
val fields = columns.split(",").map(fieldName => StructField(fieldName, StringType,
nullable = true))
val customSchema = StructType(fields)
var dfPivot =spark.read.format("com.databricks.spark.csv").option("header","false").option("inferSchema", "false").schema(customSchema).load(searchPath)
When you load the data with custom schema will be much faster compared to loading data with default schema
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With