Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I collect a List of Strings from spark DataFrame Column after a GroupBy operation?

The solution described here (by zero323) is very close to what I want with two twists:

  1. How do I do it in Java?
  2. What if the column had a List of Strings instead of a single String and I want to collect all such lists into a single list after GroupBy(some other column)?

I am using Spark 1.6 and have tried to use

org.apache.spark.sql.functions.collect_list(Column col) as described in the solution to that question, but got the following error

Exception in thread "main" org.apache.spark.sql.AnalysisException: undefined function collect_list; at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:65) at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry$$anonfun$2.apply(FunctionRegistry.scala:65) at scala.Option.getOrElse(Option.scala:121)

like image 735
Kai Avatar asked Feb 10 '16 19:02

Kai


People also ask

How do I extract values from a column in PySpark?

In PySpark, the substring() function is used to extract the substring from a DataFrame string column by providing the position and length of the string you wanted to extract. In this tutorial, I have explained with an example of getting substring of a column using substring() from pyspark. sql.

How do you collect a list in PySpark?

PySpark – collect_list() collect_list() method is used to get the data from the PySpark DataFrame columns and return the values in Row format. It will return all values along with duplicates. But we need to import this method from pyspark.

How do I get a list of column names in PySpark?

You can find all column names & data types (DataType) of PySpark DataFrame by using df. dtypes and df. schema and you can also retrieve the data type of a specific column name using df. schema["name"].


1 Answers

Error you see suggests you use plain SQLContext not HiveContext. collect_list is a Hive UDF and as such requires HiveContext. It also doesn't support complex columns so the only option is to explode first:

import org.apache.spark.api.java.*;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.hive.HiveContext;
import java.util.*;
import org.apache.spark.sql.DataFrame;
import static org.apache.spark.sql.functions.*;

public class App {
  public static void main(String[] args) {
    JavaSparkContext sc = new JavaSparkContext(new SparkConf());
    SQLContext sqlContext = new HiveContext(sc);
    List<String> data = Arrays.asList(
            "{\"id\": 1, \"vs\": [\"a\", \"b\"]}",
            "{\"id\": 1, \"vs\": [\"c\", \"d\"]}",
            "{\"id\": 2, \"vs\": [\"e\", \"f\"]}",
            "{\"id\": 2, \"vs\": [\"g\", \"h\"]}"
    );
    DataFrame df = sqlContext.read().json(sc.parallelize(data));
    df.withColumn("vs", explode(col("vs")))
           .groupBy(col("id"))
           .agg(collect_list(col("vs")))
           .show();
  }
}

It is rather unlikely it will perform well though.

like image 60
zero323 Avatar answered Oct 19 '22 10:10

zero323