Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Count number of non-NaN entries in each column of Spark dataframe with Pyspark

I have a very large dataset that is loaded in Hive. It consists of about 1.9 million rows and 1450 columns. I need to determine the "coverage" of each of the columns, meaning, the fraction of rows that have non-NaN values for each column.

Here is my code:

from pyspark import SparkContext from pyspark.sql import HiveContext import string as string  sc = SparkContext(appName="compute_coverages") ## Create the context sqlContext = HiveContext(sc)  df = sqlContext.sql("select * from data_table") nrows_tot = df.count()  covgs=sc.parallelize(df.columns)         .map(lambda x: str(x))         .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.)) 

Trying this out in the pyspark shell, if I then do covgs.take(10), it returns a rather large error stack. It says that there's a problem in save in the file /usr/lib64/python2.6/pickle.py. This is the final part of the error:

py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist         at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333)         at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)         at py4j.Gateway.invoke(Gateway.java:252)         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)         at py4j.commands.CallCommand.execute(CallCommand.java:79)         at py4j.GatewayConnection.run(GatewayConnection.java:207)         at java.lang.Thread.run(Thread.java:745) 

If there is a better way to accomplish this than the way I'm trying, I'm open to suggestions. I can't use pandas, though, as it's not currently available on the cluster I work on and I don't have rights to install it.

like image 549
RKD314 Avatar asked Nov 24 '15 18:11

RKD314


People also ask

How do you count non null values in PySpark?

Solution: In order to find non-null values of PySpark DataFrame columns, we need to use negate of isNotNull() function for example ~df. name. isNotNull() similarly for non-nan values ~isnan(df.name) .

How do you count missing values in PySpark DataFrame?

In PySpark DataFrame you can calculate the count of Null, None, NaN & Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().

How do you count unique values in PySpark?

In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.


1 Answers

Let's start with a dummy data:

from pyspark.sql import Row  row = Row("v", "x", "y", "z") df = sc.parallelize([     row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0),     row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN")) ]).toDF()  ## +----+----+---+---+ ## |   v|   x|  y|  z| ## +----+----+---+---+ ## | 0.0|   1|  2|3.0| ## |null|   3|  4|5.0| ## |null|null|  6|7.0| ## | NaN|   8|  9|NaN| ## +----+----+---+---+ 

All you need is a simple aggregation:

from pyspark.sql.functions import col, count, isnan, lit, sum  def count_not_null(c, nan_as_null=False):     """Use conversion between boolean and integer     - False -> 0     - True ->  1     """     pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True))     return sum(pred.cast("integer")).alias(c)  df.agg(*[count_not_null(c) for c in df.columns]).show()  ## +---+---+---+---+ ## |  v|  x|  y|  z| ## +---+---+---+---+ ## |  2|  3|  4|  4| ## +---+---+---+---+ 

or if you want to treat NaN a NULL:

df.agg(*[count_not_null(c, True) for c in df.columns]).show()  ## +---+---+---+---+ ## |  v|  x|  y|  z| ## +---+---+---+---+ ## |  1|  3|  4|  3| ## +---+---+---+--- 

You can also leverage SQL NULL semantics to achieve the same result without creating a custom function:

df.agg(*[     count(c).alias(c)    # vertical (column-wise) operations in SQL ignore NULLs     for c in df.columns ]).show()  ## +---+---+---+ ## |  x|  y|  z| ## +---+---+---+ ## |  1|  2|  3| ## +---+---+---+ 

but this won't work with NaNs.

If you prefer fractions:

exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns] df.agg(*exprs).show()  ## +------------------+------------------+---+ ## |                 x|                 y|  z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+ 

or

# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show()  ## +------------------+------------------+---+ ## |                 x|                 y|  z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+ 

Scala equivalent:

import org.apache.spark.sql.Column import org.apache.spark.sql.functions.{col, isnan, sum}  type JDouble = java.lang.Double  val df = Seq[(JDouble, JDouble, JDouble, JDouble)](   (0.0, 1, 2, 3.0), (null, 3, 4, 5.0),   (null, null, 6, 7.0), (java.lang.Double.NaN, 8, 9, java.lang.Double.NaN) ).toDF()   def count_not_null(c: Column, nanAsNull: Boolean = false) = {   val pred = c.isNotNull and (if (nanAsNull) not(isnan(c)) else lit(true))   sum(pred.cast("integer")) }  df.select(df.columns map (c => count_not_null(col(c)).alias(c)): _*).show // +---+---+---+---+                                                                // | _1| _2| _3| _4| // +---+---+---+---+ // |  2|  3|  4|  4| // +---+---+---+---+   df.select(df.columns map (c => count_not_null(col(c), true).alias(c)): _*).show  // +---+---+---+---+  // | _1| _2| _3| _4|  // +---+---+---+---+  // |  1|  3|  4|  3|  // +---+---+---+---+ 
like image 137
zero323 Avatar answered Sep 25 '22 02:09

zero323