I would like to write an encoder for a Row type in DataSet, for a map operation that I am doing. Essentially, I do not understand how to write encoders.
Below is an example of a map operation:
In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>
Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() { @Override public Iterator<String> call(Row row) throws Exception { ArrayList<String> obj = //some map operation return obj.iterator(); } },Encoders.STRING());
I understand that instead of a string Encoder needs to be written as follows:
Encoder<Row> encoder = new Encoder<Row>() { @Override public StructType schema() { return join.schema(); //return null; } @Override public ClassTag<Row> clsTag() { return null; } };
However, I do not understand the clsTag() in the encoder, and I am trying to find a running example which can demostrate something similar (i.e. an encoder for a row type)
Edit - This is not a copy of the question mentioned : Encoder error while trying to map dataframe row to updated row as the answer talks about using Spark 1.x in Spark 2.x (I am not doing so), also I am looking for an encoder for a Row class rather than resolve an error. Finally, I was looking for a solution in Java, not in Scala.
RowEncoder is part of the Encoder framework and acts as the encoder for DataFrames, i.e. Dataset[Row] — Datasets of Rows. Note. DataFrame type is a mere type alias for Dataset[Row] that expects a Encoder[Row] available in scope which is indeed RowEncoder itself.
A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame , which is a Dataset of Row . Operations available on Datasets are divided into transformations and actions.
Encoders are part of Spark's tungusten framework. Being backed by the raw memory, updation or querying of relevant information from the encoded binary text is done via Java Unsafe APIs. Spark provides a generic Encoder interface and a generic Encoder implementing the interface called as ExpressionEncoder .
The answer is to use a RowEncoder and the schema of the dataset using StructType.
Below is a working example of a flatmap operation with Datasets:
StructType structType = new StructType(); structType = structType.add("id1", DataTypes.LongType, false); structType = structType.add("id2", DataTypes.LongType, false); ExpressionEncoder<Row> encoder = RowEncoder.apply(structType); Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() { @Override public Iterator<Row> call(Row row) throws Exception { // a static map operation to demonstrate List<Object> data = new ArrayList<>(); data.add(1l); data.add(2l); ArrayList<Row> list = new ArrayList<>(); list.add(RowFactory.create(data.toArray())); return list.iterator(); } }, encoder);
I had the same problem... Encoders.kryo(Row.class))
worked for me.
As a bonus, the Apache Spark tuning docs refer to Kryo it since it’s faster at serialization "often as much as 10x":
https://spark.apache.org/docs/latest/tuning.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With