I'm trying to load some csv data into a spark cluster and run some queries on it, but i'm running into problems getting the data loaded.
See code sample below - I've generated a header and am trying to parse the columns, but the process fails when running against the (large, column rich) data set with an obfuscated error message: 'java.lang.String is not a valid external type for schema of string'
This doesn't seem to be solved elsewhere on the internet - any one know what the problem might be?
(I originally thought this might be related to null or empty fields being loaded, but the process fails after some time, and the source data is very very sparse)
var headers = StructType(header_clean.split(",").map(fieldName ⇒ StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
p => Row(p.map( x => x.replace("\"", "").trim)))
contentRdd.createOrReplaceTempView("someView")
val domains = spark.sql("SELECT DISTINCT domain FROM someView")
For reference, bottom of error log (very spammy, lots of columns
if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true) AS pageUrl#377
+- if (assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt) null else staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true) :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object).isNullAt : :- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object) : : +- input[0, org.apache.spark.sql.Row, true] :
+- 87 :- null +- staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType), true)
+- validateexternaltype(getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl), StringType)
+- getexternalrowfield(assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object), 87, pageUrl)
+- assertnotnull(input[0, org.apache.spark.sql.Row, true], top level row object)
+- input[0, org.apache.spark.sql.Row, true] at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:279) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at org.apache.spark.sql.SparkSession$$anonfun$5.apply(SparkSession.scala:537) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) ... 3 more Caused by: java.lang.RuntimeException: [Ljava.lang.String; is not a valid external type for schema of string at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.toRow(ExpressionEncoder.scala:276) ... 17 more
I solved this problem by split the element of Row. You can do this:
StructType(header_clean.split(",").map(fieldName ⇒StructField(fieldName, StringType, true)))
var contentRdd = contentNoHeader.map(k => k.split(",")).map(
p => {
val ppp = p.map( x => x.replace("\"", "").trim)
Row(ppp(0),ppp(1),ppp(2))
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With