I am trying to write a Spark job that should put its output into HBase. AS far as I can tell, the right way to do this is to use the method saveAsHadoopDataset
on org.apache.spark.rdd.PairRDDFunctions
- this requires that my RDD
is composed of pairs.
The method saveAsHadoopDataset
requires a JobConf
, and this is what I am trying to construct. According to this link, one thing I have to set on my JobConf
is the output format (in fact it doens't work without), like
jobConfig.setOutputFormat(classOf[TableOutputFormat])
The problem is that apparently this does not compile, because TableOutputFormat
is generic, even though it ignores its type parameter. So I have tried various combinations, such as
jobConfig.setOutputFormat(classOf[TableOutputFormat[Unit]])
jobConfig.setOutputFormat(classOf[TableOutputFormat[_]])
but in any case I get an error
required: Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]
Now, as far I can tell, Class[_ <: org.apache.hadoop.mapred.OutputFormat[_, _]]
translates to Class[T] forSome { type T <: org.apache.hadoop.mapred.OutputFormat[_, _] }
. Here is where I think I have a problem, because:
Class
is invariantTableOutputFormat[T] <: OutputFormat[T, Mutation]
, butT <: OutputFormat[_, _]
Is there a way to obtain a subtype of OutputFormat[_, _]
from TableOutputFormat
? It seems the problem arises from the differences between generics in Java and in Scala - what can I do for this?
edit:
It turns out this is even subtler. I have tried to define myself a method in the REPL
def foo(x: Class[_ <: OutputFormat[_, _]]) = x
and I can actually invoke it with
foo(classOf[TableOutputFormat[Unit]])
or even
foo(classOf[TableOutputFormat[_]])
for that matters. But I cannot call
jobConf.setOutputFormat(classOf[TableOutputFormat[_]])
The original signature of setOutputFormat
in Java is void setOutputFormat(Class<? extends OutputFormat> theClass)
. How can I call it from Scala?
That's very strange, are you 100% sure you have your imports correct (EDIT: yes, this was problem, see comments), and you have the correct versions of artefacts in your build file? Maybe it could help you if I provide a code snippet from my working project:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.mapred.TableOutputFormat
val conf = HBaseConfiguration.create()
val jobConfig: JobConf = new JobConf(conf, this.getClass)
jobConfig.setOutputFormat(classOf[TableOutputFormat])
jobConfig.set(TableOutputFormat.OUTPUT_TABLE, outputTable)
and some deps I have:
"org.apache.hadoop" % "hadoop-client" % "2.3.0-mr1-cdh5.0.0",
"org.apache.hbase" % "hbase-client" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-common" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-hadoop-compat" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-it" % "0.96.1.1-cdh5.0.0", /
"org.apache.hbase" % "hbase-hadoop2-compat" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-prefix-tree" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-protocol" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-server" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-shell" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-testing-util" % "0.96.1.1-cdh5.0.0",
"org.apache.hbase" % "hbase-thrift" % "0.96.1.1-cdh5.0.0",
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With