Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark: subtract values in same DataSet row

Given the following dataset:

| title | start | end
| bla   | 10    | 30

I would like to find the difference (start - end) between the two numbers and set them into a new column, so that it looks like:

| title | time_spent |
 | bla   | 20 |

The data is of type Dataset<Row>
dataset = dataset.withColumn("millis spent: ", col("end") - col("start")).as("Time spent");
I expected this to work, as i saw in this question, but it does ot, maybe because that thread is about DataFrames and not DataSets, or maybe because Scala allows it where as in Java is illegal?

like image 677
JBoy Avatar asked Feb 04 '19 09:02

JBoy


1 Answers

You can consider static methods. In short:

import static org.apache.spark.sql.functions.expr;
...
df = df
    .withColumn("time_spent", expr("end - start"))
    .drop("start")
    .drop("end");

expr() will evaluate the value in your columns.

Here is the full example with the right imports. Sorry the bulk part of the example is about creating the dataframe.

package net.jgp.books.sparkInAction.ch12.lab990Others;

import static org.apache.spark.sql.functions.expr;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Use of expr().
 * 
 * @author jgp
 */
public class ExprApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    ExprApp app = new ExprApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("All joins!")
        .master("local")
        .getOrCreate();

    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "title",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "start",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "end",
            DataTypes.IntegerType,
            false) });

    List<Row> rows = new ArrayList<Row>();
    rows.add(RowFactory.create("bla", 10, 30));
    Dataset<Row> df = spark.createDataFrame(rows, schema);
    df.show();

    df = df
        .withColumn("time_spent", expr("end - start"))
        .drop("start")
        .drop("end");
    df.show();

  }
}
like image 134
jgp Avatar answered Nov 14 '22 20:11

jgp