I'm looking at a mystery. I have a bunch of long documents available as Python bytestrings (b"I'm a byte string"
) in a RDD. Now I convert this RDD to a DataFrame
to join it to another DataFrame
. I do that like this:
Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)
Data_schema = StructType([
StructField("URI", StringType(), True),
StructField("Content", StringType(), True),
])
Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)
print(Data_DF.show(5))
+--------------------+-----------+
| URI| Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows
these short "[B@10628e42"
strings seem fairly useless to me and are probably some kind of pointer. The bytestrings are still 'intact' in the RDD because I can still access them. So in the conversion from RDD to DataFrame
the problem occurs. Now I tried to store the bytestrings in fields with other types, namely ByteType()
and BinaryType()
. Both not working because the bytestrings are not accepted with these error messages:
TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>
But it gets even weirder. When I setup a small scale experiment:
ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))
DF2_schema = StructType([
StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)
print(DF_ByteStrings.show())
The small bytestrings are not allowed as in a StringType column as well. When I try to run this I get this error message:
StructType can not accept object b'one' in type <class 'bytes'>
when I try to let spark infer a type it also fails with this message:
TypeError: Can not infer schema for type: <class 'bytes'>
So any ideas how I could store bytestrings in a DataFrame
without to .decode()
them. That is something I only can do after I joined the two DataFrames
together, because the other one holds the decoding information.
I use Python 3.5 and Spark 2.0.1
Thanks in advance!
It is not so much a mystery. Step by step:
bytes
is byte[]
which is equivalent to Array[Byte]
in Scala.StringType
therefore Array[Byte]
will be converted to String
before storing in a DataFrame
.Arrays
in Scala are ugly Java artifact and among other problems have no useful toString
method:
Array(192, 168, 1, 1).map(_.toByte)
Array[Byte] = Array(-64, -88, 1, 1)
Array(192, 168, 1, 1).map(_.toByte).toString
String = [B@6c9fe061
This is how you get the content of the column.
There is no type in Spark SQL that maps directly to Python bytes
. Personally I would join join
RDDs but if you really want to use DataFrames
you can use intermediate BinaryType
representation.
from collections import namedtuple
Record = namedtuple("Record", ["url", "content"])
rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()
df.printSchema()
root
|-- url: string (nullable = true)
|-- content: binary (nullable = true)
It won't give you that can be used natively (JVM) nor a meaningful string representation:
+-------+----------+
| url| content|
+-------+----------+
|none://|[66 6F 6F]|
|none://|[62 61 72]|
+-------+----------+
but is lossless:
df.rdd.map(lambda row: bytes(row.content)).first()
b'foo'
and can be accessed in Python udf
:
from pyspark.sql.functions import udf
from pyspark.sql import Column
from typing import Union
def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
if bs is not None:
return bytes(bs).decode(enc)
except UnicodeDecodeError:
pass
return udf(decode_)(col)
df.withColumn("decoded", decode("content")).show()
+-------+----------+-------+
| url| content|decoded|
+-------+----------+-------+
|none://|[66 6F 6F]| foo|
|none://|[62 61 72]| bar|
+-------+----------+-------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With