Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to iterate records spark scala?

I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.

When I do

myrdd.first_1.datum.getName()

I can get the name. Problem is, I have 10 records in "myrdd". When I do:

myrdd.map(x => {println(x._1.datum.getName())})

it does not work and prints out a weird object a single time. How can I iterate over all records?

like image 746
Rolando Avatar asked Oct 09 '15 05:10

Rolando


People also ask

What is UDF in spark Scala?

In Spark, you create UDF by creating a function in a language you prefer to use for Spark. For example, if you are using Spark with scala, you create a UDF in scala language and wrap it with udf() function or register it as udf to use it on DataFrame and SQL respectively.

How do you use foreach in PySpark Dataframe?

Example of PySpark foreach Let's first create a DataFrame in Python. Now let's create a simple function first that will print all the elements in and will pass it in a For Each Loop. This is a simple Print function that prints all the data in a DataFrame. Let's iterate over all the elements using for Each loop.


1 Answers

Here is a log from a session using spark-shell with a similar scenario.

Given

scala> persons
res8: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> persons.first
res7: org.apache.spark.sql.Row = [Justin,19]

Your issue looks like

scala> persons.map(t => println(t))
res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]

so map just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).

So when you materialize (using collect()) you get a "normal" collection:

scala> persons.collect()
res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])

over which which you can map. Note that in this case you have a side-effect in the closure passed to map (the println), the result of println is Unit):

scala> persons.collect().map(t => println(t))
[Justin,19]
res5: Array[Unit] = Array(())

Same result if collect is applied at the end:

scala> persons.map(t => println(t)).collect()
[Justin,19]
res19: Array[Unit] = Array(())

But if you just want to print the rows, you can simplify it to using foreach:

scala> persons.foreach(t => println(t))
[Justin,19]

As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect is required as well:

persons.collect().foreach(t => println(t))

Notes

  • The same behaviour can be observed in the Iterator class.
  • The output of the session above has been reordered

Update

As for filtering: The location of collect is "bad", if you apply filters after collect which can be applied before.

For example these expressions give the same result:

scala> persons.filter("age > 20").collect().foreach(println)
[Michael,29]
[Andy,30]

scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
[Michael,29]
[Andy,30]

but the 2nd case is worse, because that filter could have been applied before collect.

The same applies to any type of aggregation as well.

like image 193
Beryllium Avatar answered Sep 28 '22 10:09

Beryllium