I have a very large dataset that is loaded in Hive. It consists of about 1.9 million rows and 1450 columns. I need to determine the "coverage" of each of the columns, meaning, the fraction of rows that have non-NaN values for each column.
Here is my code:
from pyspark import SparkContext from pyspark.sql import HiveContext import string as string sc = SparkContext(appName="compute_coverages") ## Create the context sqlContext = HiveContext(sc) df = sqlContext.sql("select * from data_table") nrows_tot = df.count() covgs=sc.parallelize(df.columns) .map(lambda x: str(x)) .map(lambda x: (x, float(df.select(x).dropna().count()) / float(nrows_tot) * 100.))
Trying this out in the pyspark shell, if I then do covgs.take(10), it returns a rather large error stack. It says that there's a problem in save in the file /usr/lib64/python2.6/pickle.py
. This is the final part of the error:
py4j.protocol.Py4JError: An error occurred while calling o37.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
If there is a better way to accomplish this than the way I'm trying, I'm open to suggestions. I can't use pandas, though, as it's not currently available on the cluster I work on and I don't have rights to install it.
Solution: In order to find non-null values of PySpark DataFrame columns, we need to use negate of isNotNull() function for example ~df. name. isNotNull() similarly for non-nan values ~isnan(df.name) .
In PySpark DataFrame you can calculate the count of Null, None, NaN & Empty/Blank values in a column by using isNull() of Column class & SQL functions isnan() count() and when().
In Pyspark, there are two ways to get the count of distinct values. We can use distinct() and count() functions of DataFrame to get the count distinct of PySpark DataFrame. Another way is to use SQL countDistinct() function which will provide the distinct value count of all the selected columns.
Let's start with a dummy data:
from pyspark.sql import Row row = Row("v", "x", "y", "z") df = sc.parallelize([ row(0.0, 1, 2, 3.0), row(None, 3, 4, 5.0), row(None, None, 6, 7.0), row(float("Nan"), 8, 9, float("NaN")) ]).toDF() ## +----+----+---+---+ ## | v| x| y| z| ## +----+----+---+---+ ## | 0.0| 1| 2|3.0| ## |null| 3| 4|5.0| ## |null|null| 6|7.0| ## | NaN| 8| 9|NaN| ## +----+----+---+---+
All you need is a simple aggregation:
from pyspark.sql.functions import col, count, isnan, lit, sum def count_not_null(c, nan_as_null=False): """Use conversion between boolean and integer - False -> 0 - True -> 1 """ pred = col(c).isNotNull() & (~isnan(c) if nan_as_null else lit(True)) return sum(pred.cast("integer")).alias(c) df.agg(*[count_not_null(c) for c in df.columns]).show() ## +---+---+---+---+ ## | v| x| y| z| ## +---+---+---+---+ ## | 2| 3| 4| 4| ## +---+---+---+---+
or if you want to treat NaN
a NULL
:
df.agg(*[count_not_null(c, True) for c in df.columns]).show() ## +---+---+---+---+ ## | v| x| y| z| ## +---+---+---+---+ ## | 1| 3| 4| 3| ## +---+---+---+---
You can also leverage SQL NULL
semantics to achieve the same result without creating a custom function:
df.agg(*[ count(c).alias(c) # vertical (column-wise) operations in SQL ignore NULLs for c in df.columns ]).show() ## +---+---+---+ ## | x| y| z| ## +---+---+---+ ## | 1| 2| 3| ## +---+---+---+
but this won't work with NaNs
.
If you prefer fractions:
exprs = [(count_not_null(c) / count("*")).alias(c) for c in df.columns] df.agg(*exprs).show() ## +------------------+------------------+---+ ## | x| y| z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+
or
# COUNT(*) is equivalent to COUNT(1) so NULLs won't be an issue df.select(*[(count(c) / count("*")).alias(c) for c in df.columns]).show() ## +------------------+------------------+---+ ## | x| y| z| ## +------------------+------------------+---+ ## |0.3333333333333333|0.6666666666666666|1.0| ## +------------------+------------------+---+
Scala equivalent:
import org.apache.spark.sql.Column import org.apache.spark.sql.functions.{col, isnan, sum} type JDouble = java.lang.Double val df = Seq[(JDouble, JDouble, JDouble, JDouble)]( (0.0, 1, 2, 3.0), (null, 3, 4, 5.0), (null, null, 6, 7.0), (java.lang.Double.NaN, 8, 9, java.lang.Double.NaN) ).toDF() def count_not_null(c: Column, nanAsNull: Boolean = false) = { val pred = c.isNotNull and (if (nanAsNull) not(isnan(c)) else lit(true)) sum(pred.cast("integer")) } df.select(df.columns map (c => count_not_null(col(c)).alias(c)): _*).show // +---+---+---+---+ // | _1| _2| _3| _4| // +---+---+---+---+ // | 2| 3| 4| 4| // +---+---+---+---+ df.select(df.columns map (c => count_not_null(col(c), true).alias(c)): _*).show // +---+---+---+---+ // | _1| _2| _3| _4| // +---+---+---+---+ // | 1| 3| 4| 3| // +---+---+---+---+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With