Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - How to count number of records by key

This is probably an easy problem but basically I have a dataset where I am to count the number of females for each country. Ultimately I want to group each count by the country but I am unsure of what to use for the value since there is not a count column in the dataset that I can use as the value in a groupByKey or reduceByKey. I thought of using a reduceByKey() but that requires a key-value pair and I only want to count the key and make a counter as the value. How do I go about this?

val lines = sc.textFile("/home/cloudera/desktop/file.txt")
val split_lines = lines.map(_.split(","))
val femaleOnly = split_lines.filter(x => x._10 == "Female")

Here is where I am stuck. The country is index 13 in the dataset also. The output should something look like this: (Australia, 201000) (America, 420000) etc Any help would be great. Thanks

like image 248
user2768498 Avatar asked Jun 03 '15 09:06

user2768498


People also ask

How do I count the number of records in a Spark data frame?

To get the number of rows from the PySpark DataFrame use the count() function. This function returns the total number of rows from the DataFrame. By calling this function it triggers all transformations on this DataFrame to execute.

How do you count records in RDD?

cache(); double count_ctid = (double)join. count(); // i want to get the count of these three RDD double all = (double)lines. count(); double count_cfid = all - CFIDNotNull. count(); System.

How does count work in Spark?

The count is an action operation in PySpark that is used to count the number of elements present in the PySpark data model. It is a distributed model in PySpark where actions are distributed, and all the data are brought back to the driver node.


2 Answers

You're nearly there! All you need is a countByValue:

val countOfFemalesByCountry = femaleOnly.map(_(13)).countByValue()
// Prints (Australia, 230), (America, 23242), etc.

(In your example, I assume you meant x(10) rather than x._10)

All together:

sc.textFile("/home/cloudera/desktop/file.txt")
    .map(_.split(","))
    .filter(x => x(10) == "Female")
    .map(_(13))
    .countByValue()
like image 114
dpeacock Avatar answered Sep 16 '22 15:09

dpeacock


Have you considered manipulating your RDD using the Dataframes API ?

It looks like you're loading a CSV file, which you can do with spark-csv.

Then it's a simple matter (if your CSV is titled with the obvious column names) of:

import com.databricks.spark.csv._

val countryGender = sqlContext.csvFile("/home/cloudera/desktop/file.txt") // already splits by field
  .filter($"gender" === "Female")
  .groupBy("country").count().show()

If you want to go deeper in this kind of manipulation, here's the guide: https://spark.apache.org/docs/latest/sql-programming-guide.html

like image 45
Francois G Avatar answered Sep 19 '22 15:09

Francois G