I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.
When I do
myrdd.first_1.datum.getName()
I can get the name. Problem is, I have 10 records in "myrdd". When I do:
myrdd.map(x => {println(x._1.datum.getName())})
it does not work and prints out a weird object a single time. How can I iterate over all records?
In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.
Example of PySpark foreach Let's first create a DataFrame in Python. Now let's create a simple function first that will print all the elements in and will pass it in a For Each Loop. This is a simple Print function that prints all the data in a DataFrame. Let's iterate over all the elements using for Each loop.
Here is a log from a session using spark-shell
with a similar scenario.
Given
scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]
Your issue looks like
scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
so map
just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).
So when you materialize (using collect()
) you get a "normal" collection:
scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
over which which you can map
. Note that in this case you have a side-effect in the closure passed to map
(the println
), the result of println
is Unit
):
scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())
Same result if collect
is applied at the end:
scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())
But if you just want to print the rows, you can simplify it to using foreach
:
scala> persons.foreach(t => println(t))
[Justin,19]
As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect
is required as well:
persons.collect().foreach(t => println(t))
Notes
Iterator
class.Update
As for filtering: The location of collect
is "bad", if you apply filters after collect
which can be applied before.
For example these expressions give the same result:
scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]
scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]
but the 2nd case is worse, because that filter could have been applied before collect
.
The same applies to any type of aggregation as well.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With