Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

UDF not working in Spark SQL

I'm trying to calculate Jaccard index on Spark SQL. My table on Hive has the following data:

hive> select * from test_1;

1   ["rock","pop"]
2   ["metal","rock"]

Table DDL:

create table test_1
(id int, val array<string>);

I'm using the UDF from Brickhouse. From spark-shell, I'm able to execute the following commands to create the temporary function.

val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
hiveContext.hql("CREATE TEMPORARY FUNCTION jaccard_similarity AS 'brickhouse.udf.sketch.SetSimilarityUDF'")

I have also added the .jar file to the CLASSPATH for spark-shell (in compute-classpath.sh).

When I list the functions, I'm able to see the new function that I have created.

hiveContext.hql("show functions").collect().foreach(println)

Next, I use the jaccard_similarity function to calculate the Jaccard Index for the val array.

hiveContext.hql("select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b")

I'm getting the following error:

14/07/31 15:39:56 INFO ParseDriver: Parsing command: select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b
14/07/31 15:39:56 INFO ParseDriver: Parse Completed
14/07/31 15:39:56 INFO Analyzer: Max iterations (2) reached for batch MultiInstanceRelations
14/07/31 15:39:56 INFO Analyzer: Max iterations (2) reached for batch CaseInsensitiveAttributeReferences
14/07/31 15:39:56 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/07/31 15:39:56 INFO audit: ugi=username  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test_1   
14/07/31 15:39:56 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/07/31 15:39:56 INFO audit: ugi=username  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test_1   
scala.MatchError: ArrayType(StringType) (of class org.apache.spark.sql.catalyst.types.ArrayType)
    at org.apache.spark.sql.hive.HiveInspectors$typeInfoConversions.toTypeInfo(hiveUdfs.scala:382)
    at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
    at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.hive.HiveFunctionRegistry$.lookupFunction(hiveUdfs.scala:52)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:131)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:129)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:52)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:65)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:129)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:127)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:126)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:62)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:60)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:60)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:52)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:52)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:313)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:313)
    at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:248)
    at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:247)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:316)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:316)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:319)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:319)
    at org.apache.spark.sql.hive.HiveContext$QueryExecution.simpleString(HiveContext.scala:315)
    at org.apache.spark.sql.SchemaRDDLike$class.toString(SchemaRDDLike.scala:67)
    at org.apache.spark.sql.SchemaRDD.toString(SchemaRDD.scala:100)
    at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
    at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
    at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
    at .<init>(<console>:10)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1056)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:796)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:841)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:753)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:601)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:608)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:611)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:936)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:884)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:884)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:982)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:303)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I looked at the Spark source code from GitHub. In datatypes.scala, there's the following code:

 protected lazy val arrayType: Parser[DataType] =
    "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
      case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)

I couldn't find any reference to array being not supported by Spark SQL. It would be great if anyone can share any pointers on how to get this working.

Also, the function works perfectly from Hive shell.

Update (5th August):

I just build Spark from the Master branch on Github. The error message has some more info (like scala.MatchError: ArrayType(StringType,false) instead of scala.MatchError: ArrayType(StringType))

scala> hiveContext.hql("select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b")
warning: there were 1 deprecation warning(s); re-run with -deprecation for details
14/08/05 13:54:53 INFO ParseDriver: Parsing command: select jaccard_similarity(a.val, b.val) from test_1 a join test_1 b
14/08/05 13:54:53 INFO ParseDriver: Parse Completed
14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO audit: ugi=chandrv1  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test_1   
14/08/05 13:54:53 INFO HiveMetaStore: 0: get_table : db=default tbl=test_1
14/08/05 13:54:53 INFO audit: ugi=chandrv1  ip=unknown-ip-addr  cmd=get_table : db=default tbl=test_1   
scala.MatchError: ArrayType(StringType,false) (of class org.apache.spark.sql.catalyst.types.ArrayType)
    at org.apache.spark.sql.hive.HiveInspectors$typeInfoConversions.toTypeInfo(HiveInspectors.scala:216)
    at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
    at org.apache.spark.sql.hive.HiveFunctionRegistry$$anonfun$2.apply(hiveUdfs.scala:52)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.hive.HiveFunctionRegistry.lookupFunction(hiveUdfs.scala:52)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.org$apache$spark$sql$catalyst$analysis$OverrideFunctionRegistry$$super$lookupFunction(HiveContext.scala:253)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$$anonfun$lookupFunction$2.apply(FunctionRegistry.scala:41)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.catalyst.analysis.OverrideFunctionRegistry$class.lookupFunction(FunctionRegistry.scala:41)
    at org.apache.spark.sql.hive.HiveContext$$anon$3.lookupFunction(HiveContext.scala:253)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:131)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5$$anonfun$applyOrElse$3.applyOrElse(Analyzer.scala:129)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:183)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildrenDown(TreeNode.scala:212)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:168)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionDown$1(QueryPlan.scala:52)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1$$anonfun$apply$1.apply(QueryPlan.scala:66)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:65)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:41)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:129)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$5.applyOrElse(Analyzer.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:127)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$.apply(Analyzer.scala:126)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)
    at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111)
    at scala.collection.immutable.List.foldLeft(List.scala:84)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:394)
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:394)
    at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:350)
    at org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:349)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:399)
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:397)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:403)
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:403)
    at org.apache.spark.sql.hive.HiveContext$QueryExecution.simpleString(HiveContext.scala:419)
    at org.apache.spark.sql.SchemaRDDLike$class.toString(SchemaRDDLike.scala:67)
    at org.apache.spark.sql.SchemaRDD.toString(SchemaRDD.scala:103)
    at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:324)
    at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:329)
    at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:337)
    at .<init>(<console>:10)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:788)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1061)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:614)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:645)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:609)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:314)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:73)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I also looked at HiveInspectors.scala (line 212, typeInfoConversions). It seems that ArrayType isn't defined there.

like image 650
visakh Avatar asked Jul 31 '14 13:07

visakh


1 Answers

Sorry, my reputation on StackOverflow is not high enough to leave a comment. Hope this "answer" reaches you well. Anyway, I'm playing on SparkSQL using HiveContext and noticed quite a similar behaviour with ArrayType. Although it does not solve your problem, it might explain why

It turns out that ArrayType are supported on HiveContext (Spark 1.1.0) only when using spark "internal" tables structure. Whenever you try to access spark "external" hive tables (i.e. hosted on metastore), you might face similar issues with ArrayType not supported..

Here is a simple illustration

// ************
// ArrayType is supported when playing with SparkSQL temp tables...
// ************

val sqlContext = org.apache.spark.sql.hive.HiveContext(sc)
val rdd = sqlContext.jsonFile("/tmp/test.json")
rdd.printSchema
/*
    root
    |-- id: integer (nullable = true)
    |-- names: array (nullable = true)
    |    |-- element: string (containsNull = false)
*/

sqlContext.registerRDDAsTable(rdd,"test")
val out = sqlContext.sql("SELECT names FROM test")

// ************
// ...But fail on Hive statements
// ************

sqlContext.sql("CREATE TABLE mytable AS SELECT names FROM test")

/*
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 5.0 failed 4 times, most recent failure: Lost task 0.3 in stage 5.0 (TID 16, vagrant): scala.MatchError: ArrayType(StringType,true) (of class org.apache.spark.sql.catalyst.types.ArrayType)
    org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247)
    org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247)
    org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263)
.../...
*/

I still don't know exactly why / where it fails, but HiveContext does not (fully) support ArrayType. Anyway, I doubt the issue you're describing here is related to your jaccard UDF function..

Alternatively, uses such a (working) ugly hack :)

sqlContext.sql("CREATE TABLE mytable AS SELECT split(concat_ws('#',names),'#') FROM test")
like image 58
Antoine Amend Avatar answered Nov 15 '22 19:11

Antoine Amend