Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

pyspark: TypeError: IntegerType can not accept object in type <type 'unicode'>

programming with pyspark on a Spark cluster, the data is large and in pieces so can not be loaded into the memory or check the sanity of the data easily

basically it looks like

af.b Current%20events 1 996
af.b Kategorie:Musiek 1 4468
af.b Spesiaal:RecentChangesLinked/Gebruikerbespreking:Freakazoid 1 5209
af.b Spesiaal:RecentChangesLinked/Sir_Arthur_Conan_Doyle 1 5214

wikipedia data:

I read it from aws S3 and then try to construct spark Dataframe with the following python code in pyspark intepreter:

parts = data.map(lambda l: l.split())
wikis = parts.map(lambda p: (p[0], p[1],p[2],p[3]))


fields = [StructField("project", StringType(), True),
StructField("title", StringType(), True),
StructField("count", IntegerType(), True),
StructField("byte_size", StringType(), True)] 

schema = StructType(fields) 

df = sqlContext.createDataFrame(wikis, schema)

all look fine, only createDataFrame gives me error

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 404, in   createDataFrame
rdd, schema = self._createFromRDD(data, schema, samplingRatio)
File "/usr/lib/spark/python/pyspark/sql/context.py", line 298, in _createFromRDD
_verify_type(row, schema)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1152, in _verify_type
_verify_type(v, f.dataType)
File "/usr/lib/spark/python/pyspark/sql/types.py", line 1136, in _verify_type
raise TypeError("%s can not accept object in type %s" % (dataType, type(obj)))
TypeError: IntegerType can not accept object in type <type 'unicode'>

why I can not set the third column which should be count to IntegerType ? How can I solve this ?

like image 986
Hello lad Avatar asked Oct 14 '15 15:10

Hello lad


1 Answers

As noted by ccheneson you pass wrong types.

Assuming you data looks like this:

data = sc.parallelize(["af.b Current%20events 1 996"])

After the first map you get RDD[List[String]]:

parts = data.map(lambda l: l.split())
parts.first()
## ['af.b', 'Current%20events', '1', '996']

The second map converts it to tuple (String, String, String, String):

wikis = parts.map(lambda p: (p[0], p[1], p[2],p[3]))
wikis.first()
## ('af.b', 'Current%20events', '1', '996')

Your schema states that 3rd columns is an integer:

[f.dataType for f in schema.fields]
## [StringType, StringType, IntegerType, StringType]

Schema is used most to avoid a full table scan to infer types and doesn't perform any type casting.

You can either cast your data during last map:

wikis = parts.map(lambda p: (p[0], p[1], int(p[2]), p[3]))

Or define count as a StringType and cast column

fields[2] = StructField("count", StringType(), True)
schema = StructType(fields) 

wikis.toDF(schema).withColumn("cnt", col("count").cast("integer")).drop("count")

On a side note count is reserved word in SQL and shouldn't be used as a column name. In Spark it will work as expected in some contexts and fail in another.

like image 186
zero323 Avatar answered Sep 22 '22 19:09

zero323