I have a Java client class (used as a dependency Jar with spark-shell) that responds to an API call - let's call the class SomeAPIRequester.
In plain Java, it would return me desired results with below sample code -
SomeAPIRequester requester = SomeAPIRequester.builder().name("abc").build() // build the class
System.out.println(requester.getSomeItem("id123")) // result: {"id123": "item123"}
I want to call this API in a distributed manner through my RDD of IDs in a stored in spark dataframe (in scala) -
val inputIdRdd = sc.parallelize(List("id1", "id2", "id3"...)) // sample RDD of IDs i want to call the API for
and I define my UDF as -
val test: UserDefinedFunction = udf((id: String, requester: SomeAPIRequester) => {
requester.getSomeItem(id)
})
and call this UDF as -
inputIdRdd.toDf("ids").withColumn("apiResult", test(col("ids"), requester) // requester as built with SomeAPIRequester.builder()....
// or directly with RDD ? udf, or a plain scala function ..
inputIdRdd.foreach{ id => test(id, requester) }
When I run a .show() or .take() on the result, I get NullPointerException on the requester java class.
I also tried sending in literals (lit), and I read about typedLit in scala, but I could not convert the Java Requester class into any allowed typedLit types in scala.
Is there a way to call this Java class object through UDFs and get the result from the API?
I also tried to initialize the requester class in the RDD's foreach block -
inputIdRdd.foreach(x =>{
val apiRequester = SomeAPIRequester.builder()...(argPool).build()
try {
apiRequester.getSomeItem(x)
} catch {
case ex: Exception => println(ex.printStackTrace()); ""
}
})
But this returns no response - cannot initialize class etc.
Thanks!
Working with custom classes working with Spark requires having some knowledge about how Spark works under the hood. Don´t put your instance as a parameter in the udf. Parameters in udfs are extracted from the rows of the dataframe, the null pointer exception is understandable in this case. You can try with the following options:
First put the instance in the scope of the udf:
val requester: SomeAPIRequester = ???
val test: UserDefinedFunction = udf((id: String) => {
requester.getSomeItem(id)
})
At this point you will need to mark your class as Serializable if possible, otherwise you will have a NotSerializableException.
If your class is not Seriazable because it comes form a third party you can mark your instance as lazy transient val as you can see in https://mengdong.github.io/2016/08/16/spark-serialization-memo/ or https://medium.com/@swapnesh.chaubal/writing-to-logentries-from-apache-spark-35831282f53d.
If you work in the RDD world you can use mapPartitions to create just one instance per partition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With