Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Strange behavior when using toDF() function to transfrom RDD to Dataframe in PySpark

I am new in Spark. And when I use toDF() function to convert RDD to dataframe, it seems to compute all the transformation function like map() I've written before. I wonder if toDF() in PySpark is a transformation or an action.

I create a simple RDD and use a simple function to output its value, just for test, And use toDF() after map(). The result seems to run the function in map partially. And when I show the result of dataframe, toDF() act like transformation and output the result again.

>>> a = sc.parallelize([(1,),(2,),(3,)])
>>> def f(x):
...     print(x[0])
...     return (x[0] + 1, )
...
>>> b = a.map(f).toDF(["id"])
2
1
>>> b = a.map(f).toDF(["id"]).show()
2
1
1
2
3
+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+

Could someone tell me why toDF() function in PySpark act both like action and transformation? Thanks a lot.

PS: In Scala, toDF act like transformation in my case.

like image 929
xking Avatar asked Oct 31 '18 05:10

xking


People also ask

What does toDF do in PySpark?

PYSPARK toDF is a method in PySpark that is used to create a Data frame in PySpark. The model provides a way . toDF that can be used to create a data frame from an RDD. Post conversion of RDD in a data frame, the data then becomes more organized and easy for analysis purposes.

Can we convert RDD to DataFrame in PySpark?

In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame.

How do you convert a spark RDD into a DataFrame?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

Is toDF an action?

Converting Typed Dataset to Untyped DataFrame — toDF Basic Action. toDF converts a Dataset into a DataFrame. Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset 's SparkSession and QueryExecution with the encoder being RowEncoder.


1 Answers

That's not strange. Since you didn't provide the schema, Spark has to infer it based on the data. If the RDD is an input, it will call SparkSession._createFromRDD and subsequently SparkSession._inferSchema, which, if samplingRatio is missing, will evaluate up to 100 row:

first = rdd.first()
if not first:
    raise ValueError("The first row in RDD is empty, "
                     "can not infer schema")
if type(first) is dict:
    warnings.warn("Using RDD of dict to inferSchema is deprecated. "
                  "Use pyspark.sql.Row instead")


if samplingRatio is None:
    schema = _infer_schema(first, names=names)
    if _has_nulltype(schema):
        for row in rdd.take(100)[1:]:
            schema = _merge_type(schema, _infer_schema(row, names=names))
            if not _has_nulltype(schema):
                break
        else:
            raise ValueError("Some of types cannot be determined by the "
                             "first 100 rows, please try again with sampling")

Now the only puzzle left if why it doesn't evaluate exactly one record. After-all in your case first is not empty and doesn't contain None.

That's because first is implemented through take and doesn't guarantee that the exact number of items will evaluated. If the first partition doesn't yield the required number of items, it will iteratively increase number of partitions to scan. Please check the implementation for details.

If you want to avoid this you should use createDataFrame and provide schema either as DDL string:

spark.createDataFrame(a.map(f), "val: integer")

or equivalent StructType.

You won't find any similar behavior in Scala counterpart, because it doesn't use schema inference in toDF. It either retrieves corresponding schema from the Encoder (which is fetched using Scala reflection), or doesn't allow conversion at all. The closest similar behavior is inference on input source like CSV or JSON:

spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))
like image 171
10465355 Avatar answered Nov 09 '22 02:11

10465355