Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Encoder for Row Type Spark Datasets

I would like to write an encoder for a Row type in DataSet, for a map operation that I am doing. Essentially, I do not understand how to write encoders.

Below is an example of a map operation:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {             @Override             public Iterator<String> call(Row row) throws Exception {                  ArrayList<String> obj = //some map operation                 return obj.iterator();             }         },Encoders.STRING()); 

I understand that instead of a string Encoder needs to be written as follows:

    Encoder<Row> encoder = new Encoder<Row>() {         @Override         public StructType schema() {             return join.schema();             //return null;         }          @Override         public ClassTag<Row> clsTag() {             return null;         }     }; 

However, I do not understand the clsTag() in the encoder, and I am trying to find a running example which can demostrate something similar (i.e. an encoder for a row type)

Edit - This is not a copy of the question mentioned : Encoder error while trying to map dataframe row to updated row as the answer talks about using Spark 1.x in Spark 2.x (I am not doing so), also I am looking for an encoder for a Row class rather than resolve an error. Finally, I was looking for a solution in Java, not in Scala.

like image 761
tsar2512 Avatar asked Apr 05 '17 18:04

tsar2512


People also ask

What is row encoder?

RowEncoder is part of the Encoder framework and acts as the encoder for DataFrames, i.e. Dataset[Row] — Datasets of Rows. Note. DataFrame type is a mere type alias for Dataset[Row] that expects a Encoder[Row] available in scope which is indeed RowEncoder itself.

What is Dataset row in Spark?

A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame , which is a Dataset of Row . Operations available on Datasets are divided into transformations and actions.

What is an encoder in Spark?

Encoders are part of Spark's tungusten framework. Being backed by the raw memory, updation or querying of relevant information from the encoded binary text is done via Java Unsafe APIs. Spark provides a generic Encoder interface and a generic Encoder implementing the interface called as ExpressionEncoder .


2 Answers

The answer is to use a RowEncoder and the schema of the dataset using StructType.

Below is a working example of a flatmap operation with Datasets:

    StructType structType = new StructType();     structType = structType.add("id1", DataTypes.LongType, false);     structType = structType.add("id2", DataTypes.LongType, false);      ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);      Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {         @Override         public Iterator<Row> call(Row row) throws Exception {             // a static map operation to demonstrate             List<Object> data = new ArrayList<>();             data.add(1l);             data.add(2l);             ArrayList<Row> list = new ArrayList<>();             list.add(RowFactory.create(data.toArray()));             return list.iterator();         }     }, encoder); 
like image 69
tsar2512 Avatar answered Sep 22 '22 08:09

tsar2512


I had the same problem... Encoders.kryo(Row.class)) worked for me.

As a bonus, the Apache Spark tuning docs refer to Kryo it since it’s faster at serialization "often as much as 10x":

https://spark.apache.org/docs/latest/tuning.html

like image 25
Jim Bob Avatar answered Sep 24 '22 08:09

Jim Bob