Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Scala - How to group dataframe rows and apply complex function to the groups?

i am trying to solve this super simple problem and i am already sick of it, I hope somebody can help my out with this. I have a dataframe of shape like this:

---------------------------
|  Category  | Product_ID |
|------------+------------+
| a          | product 1  |
| a          | product 2  |
| a          | product 3  |
| a          | product 1  |
| a          | product 4  |
| b          | product 5  |
| b          | product 6  |
---------------------------

How do i group these rows by category and apply complicated function in Scala? Maybe something like this:

val result = df.groupBy("Category").apply(myComplexFunction)

This myComplexFunction should produce the following table for each category and upload to pairwise similarities into Hive table or save it into HDFS :


+--------------------------------------------------+
|              | Product_1 | Product_2 | Product_3 |
+------------+------------+------------------------+
| Product_1    | 1.0       | 0.1       |    0.8    |
| Product_2    | 0.1       | 1.0       |    0.5    |
| Product_3    | 0.8       | 0.5       |    1.0    |
+--------------------------------------------------+

Here is the function i want to apply (it is just computing item-item cosine similarity within each category):

def myComplexFunction(context_data : DataFrame, country_name: String,
                               context_id: String, table_name_correlations: String,
                               context_layer: String, context_index: String) : Boolean = {
            val unique_identifier =   country_name + "_" +  context_layer + "_" + context_index
            val temp_table_vocabulary = "temp_vocabulary_" + unique_identifier
            val temp_table_similarities = "temp_similarities_" + unique_identifier
            val temp_table_correlations = "temp_correlations_" + unique_identifier


            //context.count()
            // fit a CountVectorizerModel from the corpus
            //println("Creating sparse incidence matrix")
            val cvModel: CountVectorizerModel = new CountVectorizer().setInputCol("words").setOutputCol("features").fit(context_data)
            val incidence = cvModel.transform(context_data)

            // ========================================================================================
            // create dataframe of mapping from indices into the item id
            //println("Creating vocabulary")
            val vocabulary_rdd = sc.parallelize(cvModel.vocabulary)
            val rows_vocabulary_rdd = vocabulary_rdd.zipWithIndex.map{ case (s,i) => Row(s,i)}
            val vocabulary_field1 = StructField("Product_ID", StringType, true)
            val vocabulary_field2 = StructField("Product_Index", LongType, true)
            val schema_vocabulary = StructType(Seq(vocabulary_field1, vocabulary_field2))
            val df_vocabulary = hiveContext.createDataFrame(rows_vocabulary_rdd, schema_vocabulary)

            // ========================================================================================
            //println("Computing similarity matrix")
            val myvectors = incidence.select("features").rdd.map(r => r(0).asInstanceOf[Vector])
            val mat: RowMatrix = new RowMatrix(myvectors)
            val sims = mat.columnSimilarities(0.0)

            // ========================================================================================
            // Convert records of the Matrix Entry RDD into Rows
            //println("Extracting paired similarities")
            val rowRdd = sims.entries.map{case MatrixEntry(i, j, v) => Row(i, j, v)}

            // ========================================================================================
            // create dataframe schema
            //println("Creating similarity dataframe")
            val field1 = StructField("Product_Index", LongType, true)
            val field2 = StructField("Neighbor_Index", LongType, true)
            var field3 = StructField("Similarity_Score", DoubleType, true)
            val schema_similarities = StructType(Seq(field1, field2, field3))

            // create the dataframe
            val df_similarities = hiveContext.createDataFrame(rowRdd, schema_similarities)

            // ========================================================================================
            //println("Register vocabulary and correlations as spark temp tables")
            df_vocabulary.registerTempTable(temp_table_vocabulary)
            df_similarities.registerTempTable(temp_table_similarities)

            // ========================================================================================
            //println("Extracting Product_ID")
            val temp_corrs = hiveContext.sql(
                s"SELECT T1.Product_ID, T2.Neighbor_ID, T1.Similarity_Score " +
                    s"FROM " +
                    s"(SELECT Product_ID, Neighbor_Index, Similarity_Score " +
                    s"FROM $temp_table_similarities LEFT JOIN  $temp_table_vocabulary " +
                    s"WHERE $temp_table_similarities.Product_Index = $temp_table_vocabulary.Product_Index) AS T1 " +
                    s"LEFT JOIN " +
                    s"(SELECT Product_ID AS Neighbor_ID, Product_Index as Neighbor_Index FROM $temp_table_vocabulary) AS T2 " +
                    s"ON " +
                    s"T1.Neighbor_Index = T2.Neighbor_Index")

            // ========================================================================================
            val context_corrs = temp_corrs.withColumn("Context_Layer", lit(context_layer)).withColumn("Context_ID", lit(context_id)).withColumn("Country", lit(country_name))
            context_corrs.registerTempTable(temp_table_correlations)

            // ========================================================================================
            hiveContext.sql(s"INSERT INTO TABLE $table_name_correlations SELECT * FROM $temp_table_correlations")

            // ========================================================================================
            // clean up environment
            //println("Cleaning up temp tables")
            hiveContext.dropTempTable(temp_table_correlations)
            hiveContext.dropTempTable(temp_table_similarities)
            hiveContext.dropTempTable(temp_table_vocabulary)

            return true
        }

        val partitioned = tokenized.repartition(tokenized("context_id"))
        val context_counts = partitioned.mapPartitions()
        //val context_counts = model_code_ids.zipWithIndex.map{case (model_code_id, context_index) => compute_similarity(tokenized.filter(tokenized("context_id") === model_code_id), country_name, model_code_id.asInstanceOf[String], table_name_correlations, context_layer, context_index.toString)}

    }

I have already tried following:

val category_ids = df.select("Category").distinct.collect()
val result = category_ids.map(category_id => myComplexFunction(df.filter(df("Category") <=> category_id)))

I don't know why but this approach runs sequentially and not in parallel.

like image 856
Norman D Avatar asked Nov 18 '16 16:11

Norman D


People also ask

How do I select specific rows in spark DataFrame?

Selecting rows using the filter() function The first option you have when it comes to filtering DataFrame rows is pyspark. sql. DataFrame. filter() function that performs filtering based on the specified conditions.

How does groupBy key work in spark?

In Spark, the groupByKey function is a frequently used transformation operation that performs shuffling of data. It receives key-value pairs (K, V) as an input, group the values based on key and generates a dataset of (K, Iterable ) pairs as an output.

What is Rlike in Scala?

rlike() Syntax Following is a syntax of rlike() function, It takes a literal regex expression string as a parameter and returns a boolean column based on a regex match. def rlike(literal : _root_. scala.

What is RelationalGroupedDataset?

RelationalGroupedDataset is an interface to calculate aggregates over groups of rows in a DataFrame. Note. KeyValueGroupedDataset is used for typed aggregates over groups of custom Scala objects (not Rows). RelationalGroupedDataset is a result of executing the following grouping operators: groupBy.


1 Answers

Cosine similarity is not a complex function and can expressed using standard SQL aggregations. Let's consider following example:

val df = Seq(
  ("feat1", 1.0, "item1"),
  ("feat2", 1.0, "item1"),
  ("feat6", 1.0, "item1"),
  ("feat1", 1.0, "item2"),
  ("feat3", 1.0, "item2"),
  ("feat4", 1.0, "item3"),
  ("feat5", 1.0, "item3"),
  ("feat1", 1.0, "item4"),
  ("feat6", 1.0, "item4")
).toDF("feature", "value", "item")

where feature is a feature identifier, value is a corresponding value and item is object identifier and feature, item pair has only one corresponding value.

Cosine similarity is defined as:

cosine_similarity

where numerator can be computed as:

val numer = df.as("this").withColumnRenamed("item", "this")
  .join(df.as("other").withColumnRenamed("item", "other"), Seq("feature"))
  .where($"this" < $"other")
  .groupBy($"this", $"other")
  .agg(sum($"this.value" * $"other.value").alias("dot"))

and norms used in the denominator as:

import org.apache.spark.sql.functions.sqrt

val norms = df.groupBy($"item").agg(sqrt(sum($"value" * $"value")).alias("norm"))

// Combined together:

val cosine = ($"dot" / ($"this_norm.norm" * $"other_norm.norm")).as("cosine") 

val similarities = numer
 .join(norms.alias("this_norm").withColumnRenamed("item", "this"), Seq("this"))
 .join(norms.alias("other_norm").withColumnRenamed("item", "other"), Seq("other"))
 .select($"this", $"other", cosine)

with result representing non-zero entries of the upper triangular matrix ignoring diagonal (which is trivial):

+-----+-----+-------------------+
| this|other|             cosine|
+-----+-----+-------------------+
|item1|item4| 0.8164965809277259|
|item1|item2|0.40824829046386296|
|item2|item4| 0.4999999999999999|
+-----+-----+-------------------+

This should be equivalent to:

import org.apache.spark.sql.functions.array
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}
import org.apache.spark.mllib.linalg.Vectors

val pivoted = df.groupBy("item").pivot("feature").sum()
  .na.fill(0.0)
  .orderBy("item")

val mat = new IndexedRowMatrix(pivoted
  .select(array(pivoted.columns.tail.map(col): _*))
  .rdd
  .zipWithIndex
  .map {
    case (row, idx) => 
      new IndexedRow(idx, Vectors.dense(row.getSeq[Double](0).toArray))
  })

mat.toCoordinateMatrix.transpose
  .toIndexedRowMatrix.columnSimilarities
  .toBlockMatrix.toLocalMatrix
0.0  0.408248290463863  0.0  0.816496580927726
0.0  0.0                0.0  0.4999999999999999
0.0  0.0                0.0  0.0
0.0  0.0                0.0  0.0

Regarding your code:

  • Execution is sequential because your code operates on local (collected) collection.
  • myComplexFunction cannot be further distributed because it is distributed data structures and contexts.
like image 124
zero323 Avatar answered Nov 03 '22 23:11

zero323