Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark extracting values from a Row

I have the following dataframe

val transactions_with_counts = sqlContext.sql(   """SELECT user_id AS user_id, category_id AS category_id,   COUNT(category_id) FROM transactions GROUP BY user_id, category_id""") 

I'm trying to convert the rows to Rating objects but since x(0) returns an array this fails

val ratings = transactions_with_counts   .map(x => Rating(x(0).toInt, x(1).toInt, x(2).toInt)) 

error: value toInt is not a member of Any

like image 699
Sam Avatar asked Oct 08 '15 06:10

Sam


People also ask

How do you use a row in Spark?

To create a new Row, use RowFactory. create() in Java or Row. apply() in Scala. A Row object can be constructed by providing field values.

How do I extract values from a column in PySpark?

In order to convert Spark DataFrame Column to List, first select() the column you want, next use the Spark map() transformation to convert the Row to String, finally collect() the data to the driver which returns an Array[String] .


2 Answers

Lets start with some dummy data:

val transactions = Seq((1, 2), (1, 4), (2, 3)).toDF("user_id", "category_id")  val transactions_with_counts = transactions   .groupBy($"user_id", $"category_id")   .count  transactions_with_counts.printSchema  // root // |-- user_id: integer (nullable = false) // |-- category_id: integer (nullable = false) // |-- count: long (nullable = false) 

There are a few ways to access Row values and keep expected types:

  1. Pattern matching

    import org.apache.spark.sql.Row  transactions_with_counts.map{   case Row(user_id: Int, category_id: Int, rating: Long) =>     Rating(user_id, category_id, rating) }  
  2. Typed get* methods like getInt, getLong:

    transactions_with_counts.map(   r => Rating(r.getInt(0), r.getInt(1), r.getLong(2)) ) 
  3. getAs method which can use both names and indices:

    transactions_with_counts.map(r => Rating(   r.getAs[Int]("user_id"), r.getAs[Int]("category_id"), r.getAs[Long](2) )) 

    It can be used to properly extract user defined types, including mllib.linalg.Vector. Obviously accessing by name requires a schema.

  4. Converting to statically typed Dataset (Spark 1.6+ / 2.0+):

    transactions_with_counts.as[(Int, Int, Long)] 
like image 188
zero323 Avatar answered Sep 24 '22 22:09

zero323


Using Datasets you can define Ratings as follows:

case class Rating(user_id: Int, category_id:Int, count:Long) 

The Rating class here has a column name 'count' instead of 'rating' as zero323 suggested. Thus the rating variable is assigned as follows:

val transactions_with_counts = transactions.groupBy($"user_id", $"category_id").count  val rating = transactions_with_counts.as[Rating] 

This way you will not run into run-time errors in Spark because your Rating class column name is identical to the 'count' column name generated by Spark on run-time.

like image 35
user-asterix Avatar answered Sep 23 '22 22:09

user-asterix