Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Store a Python bytestring in a Spark Dataframe

I'm looking at a mystery. I have a bunch of long documents available as Python bytestrings (b"I'm a byte string") in a RDD. Now I convert this RDD to a DataFrame to join it to another DataFrame. I do that like this:

Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)

Data_schema = StructType([
    StructField("URI", StringType(), True),
    StructField("Content", StringType(), True),
])

Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)

print(Data_DF.show(5))

+--------------------+-----------+
|                 URI|    Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows

these short "[B@10628e42" strings seem fairly useless to me and are probably some kind of pointer. The bytestrings are still 'intact' in the RDD because I can still access them. So in the conversion from RDD to DataFrame the problem occurs. Now I tried to store the bytestrings in fields with other types, namely ByteType() and BinaryType(). Both not working because the bytestrings are not accepted with these error messages:

TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>

But it gets even weirder. When I setup a small scale experiment:

ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))

DF2_schema = StructType([
    StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)

print(DF_ByteStrings.show())

The small bytestrings are not allowed as in a StringType column as well. When I try to run this I get this error message:

StructType can not accept object b'one' in type <class 'bytes'>

when I try to let spark infer a type it also fails with this message:

TypeError: Can not infer schema for type: <class 'bytes'>

So any ideas how I could store bytestrings in a DataFrame without to .decode() them. That is something I only can do after I joined the two DataFrames together, because the other one holds the decoding information.

I use Python 3.5 and Spark 2.0.1

Thanks in advance!

like image 577
Thagor Avatar asked Dec 28 '16 13:12

Thagor


1 Answers

It is not so much a mystery. Step by step:

  • Spark uses Pyrolite to convert between Python and Java types.
  • Java type for bytes is byte[] which is equivalent to Array[Byte] in Scala.
  • You defined column to be of StringType therefore Array[Byte] will be converted to String before storing in a DataFrame.
  • Arrays in Scala are ugly Java artifact and among other problems have no useful toString method:

    Array(192, 168, 1, 1).map(_.toByte)
    
    Array[Byte] = Array(-64, -88, 1, 1)
    
    Array(192, 168, 1, 1).map(_.toByte).toString
    
    String = [B@6c9fe061
    

    This is how you get the content of the column.

There is no type in Spark SQL that maps directly to Python bytes. Personally I would join join RDDs but if you really want to use DataFrames you can use intermediate BinaryType representation.

from collections import namedtuple

Record = namedtuple("Record", ["url", "content"])

rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()

df.printSchema()
root
 |-- url: string (nullable = true)
 |-- content: binary (nullable = true)

It won't give you that can be used natively (JVM) nor a meaningful string representation:

+-------+----------+
|    url|   content|
+-------+----------+
|none://|[66 6F 6F]|
|none://|[62 61 72]|
+-------+----------+

but is lossless:

df.rdd.map(lambda row: bytes(row.content)).first()
b'foo'

and can be accessed in Python udf:

from pyspark.sql.functions import udf
from pyspark.sql import Column
from typing import Union

def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
    def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
        if bs is not None:
            return bytes(bs).decode(enc)
        except UnicodeDecodeError:
            pass 
    return udf(decode_)(col)

df.withColumn("decoded", decode("content")).show()
+-------+----------+-------+
|    url|   content|decoded|
+-------+----------+-------+
|none://|[66 6F 6F]|    foo|
|none://|[62 61 72]|    bar|
+-------+----------+-------+
like image 94
zero323 Avatar answered Oct 01 '22 03:10

zero323