I have a little problem joining two datasets in spark, I have this:
SparkConf conf = new SparkConf()
.setAppName("MyFunnyApp")
.setMaster("local[*]");
SparkSession spark = SparkSession
.builder()
.config(conf)
.config("spark.debug.maxToStringFields", 150)
.getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile1)
.as(encoderObject1);
Dataset<MyOwnObject2> object2DS = spark.read()
.option("header","true")
.option("delimiter",";")
.option("inferSchema","true")
.csv(pathToFile2)
.as(encoderObject2);
I can print the schema and show it correctly.
//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS =
object1DS.join(object2DS, object1DS.col("column01")
.equalTo(object2DS.col("column01")))
.as(Encoders.tuple(MyOwnObject1,MyOwnObject2));
Last line can't make join and get me this error:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;
That's true, because Tuple2 (object2) doesn't have all vars...
Then I had tried this:
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
.joinWith(object2DS, object1DS
.col("column01")
.equalTo(object2DS.col("column01")));
And works fine! But, I need a new Dataset without tuple, I have an object3, that have some vars from object1 and object2, then I have this problem:
Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
//...
//Sets data from object 1 and 2 to 3.
//...
return myOwnObject3;
}, encoderObject3);
Fails!... here is the error:
17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import
and over thousands error lines...
What can I do? I had tried:
I want to use Datasets, to use the speed from Dataframes and object sintax from JavaRDD...
Help?
Thanks
Spark provides union() method in Dataset class to concatenate or append a Dataset to another. To append or concatenate two Datasets use Dataset. union() method on the first dataset and provide second Dataset as argument. Note: Dataset Union can only be performed on Datasets with the same number of columns.
Sticking to use cases mentioned above, Spark will perform (or be forced by us to perform) joins in two different ways: either using Sort Merge Joins if we are joining two big tables, or Broadcast Joins if at least one of the datasets involved is small enough to be stored in the memory of the single all executors.
Join in Spark SQL is the functionality to join two or more datasets that are similar to the table join in SQL based databases. Spark works as the tabular form of datasets and data frames.
Finally I found a solution,
I had a problem with the option inferSchema when my code was creating a Dataset. I have a String column that the option inferSchema return me an Integer column because all values are "numeric", but i need use them as String (like "0001", "0002"...) I need to do a schema, but I have many vars, then I write this with all my classes:
List<StructField> fieldsObject1 = new ArrayList<>();
for (Field field : MyOwnObject1.class.getDeclaredFields()) {
fieldsObject1.add(DataTypes.createStructField(
field.getName(),
CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
true)
);
}
StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.schema(schemaObject1)
.csv(pathToFile1)
.as(encoderObject1);
Works fine.
The "best" solution would be this:
Dataset<MyOwnObject1> object1DS = spark.read()
.option("header","true")
.option("delimiter",";")
.schema(encoderObject1.schema())
.csv(pathToFile1)
.as(encoderObject1);
but encoderObject1.schema() returns me a Schema with vars in alphabetical order, not in original order, then this option fails when I read a csv. Maybe Encoders should return a schema with vars in original order and not in alphabetical order
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With