I'm trying to calculate Jaccard index on Spark SQL. My table on Hive
has the following data:
hive> select * from test_1;
1 ["rock","pop"]
2 ["metal","rock"]
Table DDL:
create table test_1
(id int, val array<string>);
I'm using the UDF
from Brickhouse. From spark-shell
, I'm able to execute the following commands to create the temporary function.
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.hql("CREATE TEMPORARY FUNCTION jaccard_similarity AS 'brickhouse.udf.sketch.SetSimilarityUDF'")
I have also added the .jar
file to the CLASSPATH
for spark-shell
(in compute-classpath.sh
).
When I list the functions, I'm able to see the new function that I have created.
hiveContext.hql("show functions").collect().foreach(println)
Next, I use the jaccard_similarity
function to calculate the Jaccard Index for the val
array.
hiveContext.hql("select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b")
I'm getting the following error:
14/07/31 15:39:56 INFO ParseDriver: Parsing command: select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b
14/07/31 15:39:56 INFO ParseDriver: Parse Completed
14/07/31 15:39:56 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/31 15:39:56 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/31 15:39:56 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/07/31 15:39:56 INFO audit: ugi=username ip=unknown-ip-addr cmd=get_table : db=default tbl=test_1
14/07/31 15:39:56 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/07/31 15:39:56 INFO audit: ugi=username ip=unknown-ip-addr cmd=get_table : db=default tbl=test_1
scala.MatchError: ArrayType(StringType) (of class org.apache.spark.sql.catalyst.types.ArrayType)
at org.apache.spark.sql.hive.HiveInspectors$typeInfoConversions.toTypeInfo(hiveUdfs.scala:382)
at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveFunctionRegistry$.lookupFunction(hiveUdfs.scala:52)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:131)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:129)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:52)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:65)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:129)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:127)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:126)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:62)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:60)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:60)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:52)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:52)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:313)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:313)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:248)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:247)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:316)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:316)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:319)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:319)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.simpleString(HiveContext.scala:315)
at org.apache.spark.sql.SchemaRDDLike$class.toString(SchemaRDDLike.scala:67)
at org.apache.spark.sql.SchemaRDD.toString(SchemaRDD.scala:100)
at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .<init>(<console>:10)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I looked at the Spark
source code from GitHub. In datatypes.scala, there's the following code:
protected lazy val arrayType: Parser[DataType] =
"ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
I couldn't find any reference to array
being not supported by Spark SQL. It would be great if anyone can share any pointers on how to get this working.
Also, the function works perfectly from Hive
shell.
Update (5th August):
I just build Spark from the Master branch on Github. The error message has some more info (like scala.MatchError: ArrayType(StringType,false)
instead of scala.MatchError: ArrayType(StringType)
)
scala> hiveContext.hql("select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
14/08/05 13:54:53 INFO ParseDriver: Parsing command: select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b
14/08/05 13:54:53 INFO ParseDriver: Parse Completed
14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO audit: ugi=chandrv1 ip=unknown-ip-addr cmd=get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO audit: ugi=chandrv1 ip=unknown-ip-addr cmd=get_table : db=default tbl=test_1
scala.MatchError: ArrayType(StringType,false) (of class org.apache.spark.sql.catalyst.types.ArrayType)
at org.apache.spark.sql.hive.HiveInspectors$typeInfoConversions.toTypeInfo(HiveInspectors.scala:216)
at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:52)
at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:253)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:41)
at org.apache.spark.sql.hive.HiveContext$$anon$3.lookupFunction(HiveContext.scala:253)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:131)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:129)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:52)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:65)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:70)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:41)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:129)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:127)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:127)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:126)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:394)
at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:394)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:350)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:349)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:399)
at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:397)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:403)
at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:403)
at org.apache.spark.sql.hive.HiveContext$QueryExecution.simpleString(HiveContext.scala:419)
at org.apache.spark.sql.SchemaRDDLike$class.toString(SchemaRDDLike.scala:67)
at org.apache.spark.sql.SchemaRDD.toString(SchemaRDD.scala:103)
at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
at .<init>(<console>:10)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1061)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
I also looked at HiveInspectors.scala (line 212, typeInfoConversions
). It seems that ArrayType
isn't defined there.
Sorry, my reputation on StackOverflow is not high enough to leave a comment. Hope this "answer" reaches you well. Anyway, I'm playing on SparkSQL using HiveContext and noticed quite a similar behaviour with ArrayType. Although it does not solve your problem, it might explain why
It turns out that ArrayType are supported on HiveContext (Spark 1.1.0) only when using spark "internal" tables structure. Whenever you try to access spark "external" hive tables (i.e. hosted on metastore), you might face similar issues with ArrayType not supported..
Here is a simple illustration
// ************
// ArrayType is supported when playing with SparkSQL temp tables...
// ************
val sqlContext = org.apache.spark.sql.hive.HiveContext(sc)
val rdd = sqlContext.jsonFile("/tmp/test.json")
rdd.printSchema
/*
root
|-- id: integer (nullable = true)
|-- names: array (nullable = true)
| |-- element: string (containsNull = false)
*/
sqlContext.registerRDDAsTable(rdd,"test")
val out = sqlContext.sql("SELECT names FROM test")
// ************
// ...But fail on Hive statements
// ************
sqlContext.sql("CREATE TABLE mytable AS SELECT names FROM test")
/*
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 16, vagrant): scala.MatchError: ArrayType(StringType,true) (of class org.apache.spark.sql.catalyst.types.ArrayType)
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247)
org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)
.../...
*/
I still don't know exactly why / where it fails, but HiveContext does not (fully) support ArrayType. Anyway, I doubt the issue you're describing here is related to your jaccard UDF function..
Alternatively, uses such a (working) ugly hack :)
sqlContext.sql("CREATE TABLE mytable AS SELECT split(concat_ws('#',names),'#') FROM test")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With