Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark .show() works but .collect() does not

Tags:

pyspark

I have a very simple pyspark program that uses dataframe to query data from a group of ORC files. I am using anaconda python in windows and installed pyspark on it.

The program goes like this:

from pyspark.sql import SparkSession

spark_session = SparkSession.builder.appName("test").getOrCreate()
df_orc = spark_session .read.orc("./raw_data/")
df_orc.createOrReplaceTempView("orc")

This works fine:

spark.sql("select count(*) from orc").show()

But this will generate errors:

spark.sql("select count(*) from orc").collect()

The error messages are:

WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

Py4JJavaError: An error occurred while calling o81.collectToPython.
: java.lang.IllegalArgumentException
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.<init>(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$.getClassReader(ClosureCleaner.scala:46)
        at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.sca
        at org.apache.spark.util.FieldAccessFinder$$anon$3$$anonfun$visitMethodInsn$2.apply(ClosureCleaner.sca
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
        at scala.collection.mutable.HashMap$$anon$1$$anonfun$foreach$2.apply(HashMap.scala:103)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap$$anon$1.foreach(HashMap.scala:103)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at org.apache.spark.util.FieldAccessFinder$$anon$3.visitMethodInsn(ClosureCleaner.scala:426)
        at org.apache.xbean.asm5.ClassReader.a(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.b(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.xbean.asm5.ClassReader.accept(Unknown Source)
        at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(
        at org.apache.spark.util.ClosureCleaner$$anonfun$org$apache$spark$util$ClosureCleaner$$clean$14.apply(
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.sc
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
        at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:278)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2808)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
        at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2805)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2828)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2805)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.base/java.lang.Thread.run(Unknown Source)
like image 444
Bo Qiang Avatar asked Feb 07 '18 17:02

Bo Qiang


People also ask

What does collect () do in PySpark?

PySpark Collect() – Retrieve data from DataFrame. Collect() is the function, operation for RDD or Dataframe that is used to retrieve the data from the Dataframe. It is used useful in retrieving all the elements of the row from each partition in an RDD and brings that over the driver node/program.

What does show () do in PySpark?

Spark show() – Display DataFrame Contents in Table. Spark/PySpark DataFrame show() is used to display the contents of the DataFrame in a Table Row & Column Format. By default it shows only 20 Rows and the column values are truncated at 20 characters.

Is collect an action in Spark?

Spark collect() and collectAsList() are action operation that is used to retrieve all the elements of the RDD/DataFrame/Dataset (from all nodes) to the driver node.

What does PySpark collect return?

collect() returns Array of Row type. deptDF . collect()[0] returns the first element in an array (1st row).


1 Answers

The reason why this works fine:

spark.sql("select count(*) from orc").show()

is because .show() only works on the first 5 rows of your data

but when you run:

spark.sql("select count(*) from orc").collect()

.collect() will work on all your data

from your error message:

WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.

According to the spark documentation:

The performance overhead of creating and logging strings for wide schemas can be large. To limit the impact, we bound the number of fields to include by default. This can be overriden by setting the 'spark.debug.maxToStringFields' conf in SparkEnv.

Howeevr, it might affect the performance of your job, so you need something like:

spark = SparkSession 
.builder 
.master('local[*]') 
.appName('Notebook') 
.config('spark.sql.debug.maxToStringFields', 200) 
.getOrCreate()

This is a new SQL config spark.sql.debug.maxToStringFields that controls maximum number fields up to which truncatedString cuts its input sequences.

Default value is: DEFAULT_MAX_TO_STRING_FIELDS = 25

You can also add spark.sql.debug.maxToStringFields=100 to spark-defaults.conf

Previous versions of Spark used spark.debug.maxToStringFields instead of spark.sql.debug.maxToStringFields

like image 100
nferreira78 Avatar answered Oct 15 '22 13:10

nferreira78