I'm a newbie to Scala and Apache Spark and I'm trying to use Spark SQL. After cloning the repo I started the spark shell by typing bin/spark-shell
and run the following:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
val pathUsers = "users.txt"
case class User(uid: String, name: String, surname: String)
val users = sc.textFile(pathUsers).map(_.split(" ")).map(u => User(u(0), u(1), u(2)))
users.registerTempTable("users")
val res = sqlContext.sql("SELECT * FROM users")
res.collect().foreach(println)
and everything worked as expected. The users.txt
file is something like:
uid-1 name1 surname1
uid-2 name2 surname2
...
After that I tried to create a standalone project and I built the dependencies using sbt
. The dependencies listed in build.sbt
are the following:
"org.apache.spark" % "spark-streaming_2.10" % "1.2.0",
"org.apache.spark" % "spark-streaming-kafka_2.10" % "1.2.0",
"org.apache.spark" % "spark-sql_2.10" % "1.2.0",
"org.apache.spark" % "spark-catalyst_2.10" % "1.2.0"
If I run the same instructions it crashes on this line:
users.registerTempTable("users")
with this error:
scala.reflect.internal.MissingRequirementError: class org.apache.spark.sql.catalyst.ScalaReflection in JavaMirror with java.net.URLClassLoader@56352b57 of type class java.net.URLClassLoader with classpath [file:/Users/se7entyse7en/.sbt/boot/scala-2.10.4/lib/jansi.jar,file:/Users/se7entyse7en/.sbt/boot/scala-2.10.4/lib/jline.jar,file:/Users/se7entyse7en/.sbt/boot/scala-2.10.4/lib/scala-compiler.jar,file:/Users/se7entyse7en/.sbt/boot/scala-2.10.4/lib/scala-library.jar,file:/Users/se7entyse7en/.sbt/boot/scala-2.10.4/lib/scala-reflect.jar] and parent being xsbt.boot.BootFilteredLoader@599e80b1 of type class xsbt.boot.BootFilteredLoader with classpath [<unknown>] and parent being sun.misc.Launcher$AppClassLoader@76d4d81 of type class sun.misc.Launcher$AppClassLoader with classpath [file:/usr/local/Cellar/sbt/0.13.5/libexec/sbt-launch.jar] and parent being sun.misc.Launcher$ExtClassLoader@18fb53f6 of type class sun.misc.Launcher$ExtClassLoader with classpath [file:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/ext/dnsns.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/ext/localedata.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/ext/sunec.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar,file:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/ext/zipfs.jar,file:/System/Library/Java/Extensions/MRJToolkit.jar] and parent being primordial classloader with boot classpath [/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/sunrsasign.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/lib/JObjC.jar:/Library/Java/JavaVirtualMachines/jdk1.7.0_45.jdk/Contents/Home/jre/classes] not found.
at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:115)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
at scala.reflect.api.TypeTags$class.typeOf(TypeTags.scala:335)
at scala.reflect.api.Universe.typeOf(Universe.scala:59)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:115)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:100)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:33)
at org.apache.spark.sql.catalyst.ScalaReflection$class.attributesFor(ScalaReflection.scala:94)
at org.apache.spark.sql.catalyst.ScalaReflection$.attributesFor(ScalaReflection.scala:33)
at org.apache.spark.sql.SQLContext.createSchemaRDD(SQLContext.scala:111)
at .<init>(<console>:20)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
at scala.tools.nsc.interpreter.ILoop.main(ILoop.scala:904)
at xsbt.ConsoleInterface.run(ConsoleInterface.scala:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sbt.compiler.AnalyzingCompiler.call(AnalyzingCompiler.scala:102)
at sbt.compiler.AnalyzingCompiler.console(AnalyzingCompiler.scala:77)
at sbt.Console.sbt$Console$$console0$1(Console.scala:23)
at sbt.Console$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Console.scala:24)
at sbt.Console$$anonfun$apply$2$$anonfun$apply$1.apply(Console.scala:24)
at sbt.Console$$anonfun$apply$2$$anonfun$apply$1.apply(Console.scala:24)
at sbt.Logger$$anon$4.apply(Logger.scala:90)
at sbt.TrapExit$App.run(TrapExit.scala:244)
at java.lang.Thread.run(Thread.java:744)
what is the problem?
UPDATE:
Ok, I don't think the problem is Spark SQL, but is Spark itself as I'm not even able to perform users.collect()
. Instead if it is ran in the spark shell the result is:
res5: Array[User] = Array(User(uid-1,name1,surname1), User(uid-2,name2,surname2))
as expected. The error is the following:
15/01/08 09:47:02 INFO FileInputFormat: Total input paths to process : 1
15/01/08 09:47:02 INFO SparkContext: Starting job: collect at <console>:19
15/01/08 09:47:02 INFO DAGScheduler: Got job 0 (collect at <console>:19) with 2 output partitions (allowLocal=false)
15/01/08 09:47:02 INFO DAGScheduler: Final stage: Stage 0(collect at <console>:19)
15/01/08 09:47:02 INFO DAGScheduler: Parents of final stage: List()
15/01/08 09:47:02 INFO DAGScheduler: Missing parents: List()
15/01/08 09:47:02 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at <console>:17), which has no missing parents
15/01/08 09:47:02 INFO MemoryStore: ensureFreeSpace(2840) called with curMem=157187, maxMem=556038881
15/01/08 09:47:02 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.8 KB, free 530.1 MB)
15/01/08 09:47:02 INFO MemoryStore: ensureFreeSpace(2002) called with curMem=160027, maxMem=556038881
15/01/08 09:47:02 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2002.0 B, free 530.1 MB)
15/01/08 09:47:02 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.100.195:63917 (size: 2002.0 B, free: 530.3 MB)
15/01/08 09:47:02 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0
15/01/08 09:47:02 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838
15/01/08 09:47:02 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[3] at map at <console>:17)
15/01/08 09:47:02 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/01/08 09:47:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 192.168.100.195, PROCESS_LOCAL, 1326 bytes)
15/01/08 09:47:02 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 192.168.100.195, PROCESS_LOCAL, 1326 bytes)
15/01/08 09:47:02 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 192.168.100.195): java.io.EOFException
at java.io.ObjectInputStream$BlockDataInputStream.readFully(ObjectInputStream.java:2744)
at java.io.ObjectInputStream.readFully(ObjectInputStream.java:1032)
at org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
at org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
at org.apache.hadoop.io.UTF8.readChars(UTF8.java:216)
at org.apache.hadoop.io.UTF8.readString(UTF8.java:208)
at org.apache.hadoop.mapred.FileSplit.readFields(FileSplit.java:87)
at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:237)
at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:66)
at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:43)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:985)
at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
I also found this java.io.EOFException on Spark EC2 Cluster when submitting job programatically, but I don't know which version of hadoop-client
could be required.
To create a basic SQLContext , all you need is a SparkContext. The entry point into all functionality in Spark SQL is the SQLContext class, or one of its descendants. To create a basic SQLContext , all you need is a SparkContext. JavaSparkContext sc = ...; // An existing JavaSparkContext.
An SQLContext enables applications to run SQL queries programmatically while running SQL functions and returns the result as a DataFrame. This method uses reflection to generate the schema of an RDD that contains specific types of objects.
The short answer is no, Spark SQL does not support variables currently. The SQL Server uses T-SQL, which is based on SQL standard extended with procedure programming, local variables and other features. Spark SQL is a pure SQL, partially compatible with SQL standard.
Spark allows you to create two types of tables: managed and unmanaged. For a managed table, Spark manages both the metadata and the data in the file store. This could be a local filesystem, HDFS, or an object store such as Amazon S3 or Azure Blob.
This problem can be fixed by adding fork := true
to sbt project settings.
See: http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-2-0-MissingRequirementError-td10123.html
Other useful settings might be found in referenced project file:
https://github.com/deanwampler/spark-workshop/blob/master/project/Build.scala
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With