I'm trying to load Parquet data into PySpark
, where a column has a space in the name:
df = spark.read.parquet('my_parquet_dump')
df.select(df['Foo Bar'].alias('foobar'))
Even though I have aliased the column, I'm still getting this error and error propagating from the JVM
side of PySpark
. I've attached the stack trace below.
Is there a way I can load this parquet file into PySpark
, without pre-processing the data in Scala, and without modifying the source parquet file?
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/usr/local/python/pyspark/sql/utils.py in deco(*a, **kw)
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
/usr/local/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
Py4JJavaError: An error occurred while calling o864.collectToPython.
: org.apache.spark.sql.AnalysisException: Attribute name "Foo Bar" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:581)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:567)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$$anonfun$checkFieldNames$1.apply(ParquetSchemaConverter.scala:575)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$$anonfun$checkFieldNames$1.apply(ParquetSchemaConverter.scala:575)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldNames(ParquetSchemaConverter.scala:575)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:293)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:285)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:283)
at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:303)
at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:42)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:386)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2803)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2800)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2823)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2800)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
AnalysisException Traceback (most recent call last)
<ipython-input-37-9d7c55a5465c> in <module>()
----> 1 spark.sql("SELECT `Foo Bar` as hey FROM df limit 10").take(1)
/usr/local/python/pyspark/sql/dataframe.py in take(self, num)
474 [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
475 """
--> 476 return self.limit(num).collect()
477
478 @since(1.3)
/usr/local/python/pyspark/sql/dataframe.py in collect(self)
436 """
437 with SCCallSiteSync(self._sc) as css:
--> 438 port = self._jdf.collectToPython()
439 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
440
/usr/local/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/local/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Attribute name "Foo Bar" contains invalid character(s) among " ,;{}()\\n\\t=". Please use alias to rename it.;'
Have you tried,
df = df.withColumnRenamed("Foo Bar", "foobar")
When you select the column with an alias you're still passing the wrong column name through a select clause.
I tried @ktang 's method and it worked for me as well. I'm working with SQL and Python, so it may be different for you, but it worked nonetheless.
Ensure there are no spaces in your column names/headers. Despite the list of characters provided in the error message, space ( ) doesn't seem to be acceptable by Pyspark.
I was initially using:
master_df = spark.sql(f'''
SELECT occ.num, 'Occ Event' AS `Event Type`,
...
Changing the space ( ) in Event Type to an underscore (_) worked:
master_df = spark.sql(f'''
SELECT occ.num, 'Occ Event' AS `Event_Type`,
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With