Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating a new scala class that relies on GraphFrames without serialization issues

I am trying to write a scala class that builds on Spark's GraphFrames. The GraphFrame class is defined here. The GraphFrames class extends serializable. I would like to write a class that extends GraphFrames and computes some additional graph properties. To simplify for this example, I have created a class that doesn't contain any functions. All it does is just extend GraphFrames:

import org.apache.spark.sql.DataFrame
import org.graphframes._

class NewGraphFrame(@transient private val _vertices: DataFrame,
                    @transient private val _edges: DataFrame) extends GraphFrame {

}

val vertices = Seq(
  (1, "John"),
  (2, "Jane"),
  (3, "Karen")
).toDF("id", "name")

val edges = Seq(
  (1, 3),
  (2, 3),
  (2, 1)
).toDF("src", "dst")

val g = new NewGraphFrame(vertices, edges)

When I run this code in the REPL, I get the following error:

java.lang.Exception: You cannot use GraphFrame objects within a Spark closure
  at org.graphframes.GraphFrame.vertices(GraphFrame.scala:125)
  at org.graphframes.GraphFrame.toString(GraphFrame.scala:55)
  at scala.runtime.ScalaRunTime$.scala$runtime$ScalaRunTime$$inner$1(ScalaRunTime.scala:332)
  at scala.runtime.ScalaRunTime$.stringOf(ScalaRunTime.scala:337)
  at scala.runtime.ScalaRunTime$.replStringOf(ScalaRunTime.scala:345)
  at .$print$lzycompute(<console>:10)
  at .$print(<console>:6)
  at $print(<console>)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)
  at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)
  at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
  at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
  at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)
  at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)
  at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)
  at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)
  at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)
  at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:415)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:923)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
  at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)
  at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
  at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)
  at org.apache.spark.repl.Main$.doMain(Main.scala:76)
  at org.apache.spark.repl.Main$.main(Main.scala:56)
  at org.apache.spark.repl.Main.main(Main.scala)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
  at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
  at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
  at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
  at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
  at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

I know that this means that I'm serializing twice. However, I am obviously not interested in doing that. I simply want to extend this class so that I can use the graph functionality in my class. How do I extend this class without the spark repl throwing this error?

like image 363
Michal Avatar asked Feb 23 '26 03:02

Michal


1 Answers

Interestingly enough, no serialization ever happened, so this isn't really a serialization issue.

The definition of GraphFrame boils down to:

class GraphFrame private(private val _vertices: DataFrame, private val _edges) extends Logging with Serializable

The first private means that the primary constructor is only accessible from within GraphFrame or its companion object. Since _vertices and _edges are private, they're inaccessible from subclasses and cannot be overridden (your _vertices and _edges fields are different from the fields of that name in GraphFrame: that you couldn't prefix with override would demonstrate this.

GraphFrame does define a single public zero-arg constructor:

def this() = this(null, null)

Any subclass of GraphFrame will thus have null for GraphFrame._vertices or GraphFrame._edges, which is also what would be there if you tried to serialize a GraphFrame. The accessors vertices and edges check for null and throw. Since those accessors are called from the toString method, that's why the REPL blows up when you create a NewGraphFrame. GraphFrame seems to assume that if those are null it means serialization was attempted.

This can be demonstrated in the REPL with:

new NewGraphFrame(vertices, edges).vertices

which will result in null.

If you override the vertices (note the absence of _) method with methods that retrieve NewNewGraphFrame._vertices (and do the same with respect to edges), you'll at least be able to toString the new class:

override def vertices: DataFrame = _vertices
override def edges: DataFrame = _edges

To the extent that the methods in GraphFrame you're interested in use vertices/edges instead of _vertices/_edges you'll be able to leverage the existing methods of GraphFrame.

like image 172
Levi Ramsey Avatar answered Feb 25 '26 13:02

Levi Ramsey