I am new in Spark. And when I use toDF() function to convert RDD to dataframe, it seems to compute all the transformation function like map() I've written before. I wonder if toDF() in PySpark is a transformation or an action.
I create a simple RDD and use a simple function to output its value, just for test, And use toDF() after map(). The result seems to run the function in map partially. And when I show the result of dataframe, toDF() act like transformation and output the result again.
>>> a = sc.parallelize([(1,),(2,),(3,)])
>>> def f(x):
... print(x[0])
... return (x[0] + 1, )
...
>>> b = a.map(f).toDF(["id"])
2
1
>>> b = a.map(f).toDF(["id"]).show()
2
1
1
2
3
+---+
| id|
+---+
| 2|
| 3|
| 4|
+---+
Could someone tell me why toDF() function in PySpark act both like action and transformation? Thanks a lot.
PS: In Scala, toDF act like transformation in my case.
PYSPARK toDF is a method in PySpark that is used to create a Data frame in PySpark. The model provides a way . toDF that can be used to create a data frame from an RDD. Post conversion of RDD in a data frame, the data then becomes more organized and easy for analysis purposes.
In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame.
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
Converting Typed Dataset to Untyped DataFrame — toDF Basic Action. toDF converts a Dataset into a DataFrame. Internally, the empty-argument toDF creates a Dataset[Row] using the Dataset 's SparkSession and QueryExecution with the encoder being RowEncoder.
That's not strange. Since you didn't provide the schema, Spark has to infer it based on the data. If the RDD
is an input, it will call SparkSession._createFromRDD
and subsequently SparkSession._inferSchema
, which, if samplingRatio
is missing, will evaluate up to 100 row:
first = rdd.first()
if not first:
raise ValueError("The first row in RDD is empty, "
"can not infer schema")
if type(first) is dict:
warnings.warn("Using RDD of dict to inferSchema is deprecated. "
"Use pyspark.sql.Row instead")
if samplingRatio is None:
schema = _infer_schema(first, names=names)
if _has_nulltype(schema):
for row in rdd.take(100)[1:]:
schema = _merge_type(schema, _infer_schema(row, names=names))
if not _has_nulltype(schema):
break
else:
raise ValueError("Some of types cannot be determined by the "
"first 100 rows, please try again with sampling")
Now the only puzzle left if why it doesn't evaluate exactly one record. After-all in your case first
is not empty and doesn't contain None
.
That's because first
is implemented through take
and doesn't guarantee that the exact number of items will evaluated. If the first partition doesn't yield the required number of items, it will iteratively increase number of partitions to scan. Please check the implementation for details.
If you want to avoid this you should use createDataFrame
and provide schema either as DDL string:
spark.createDataFrame(a.map(f), "val: integer")
or equivalent StructType
.
You won't find any similar behavior in Scala counterpart, because it doesn't use schema inference in toDF
. It either retrieves corresponding schema from the Encoder
(which is fetched using Scala reflection), or doesn't allow conversion at all. The closest similar behavior is inference on input source like CSV or JSON:
spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With