data=
"""
user date item1 item2
1 2015-12-01 14 5.6
1 2015-12-01 10 0.6
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
2 2015-12-01 30 0.3
2 2015-12-01 89 1.2
2 2015-12-30 70 1.9
2 2015-12-31 20 2.5
3 2015-12-01 19 9.3
3 2015-12-01 40 2.3
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
"""
if I have some data like above, how can I get each user's latest day's record? I tried to use the groupByKey, but have no idea.
val user = data.map{
case(user,date,item1,item2)=>((user,date),Array(item1,item2))
}.groupByKey()
and then I don't know how to deal with it. Can anyone give me some suggestions? Thanks a lot:)
I changed my data,and now user has several records in the latest day, and I want to get all of them. Thx:)
I want to get the result is:
user1 (2015-12-02,Array(8,9.4),Array(90,1.3))
user2 (2015-12-31,Array(20,2.5))
user3 (2015-12-02,Array(13,1.4),Array(50,1.0),Array(19,7,8))
and now I write some code:
val data2=data.trim.split("\\n").map(_split("\\s+")).map{
f=>{(f(0),ArrayBuffer(
f(1),
f(2).toInt,
f(3).toDouble)
)}
}
val data3 = sc.parallelize(data2)
data3.reduceByKey((x,y)=>
if(x(0).toString.compareTo(y(0).toString)>=0) x++=y
else y).foreach(println)
result is:
(2,ArrayBuffer(2015-12-31, 20, 2.5))
(1,ArrayBuffer(2015-12-02, 8, 9.4, 2015-12-02, 90, 1.3))
(3,ArrayBuffer(2015-12-02, 13, 1.4, 2015-12-02, 50, 1.0, 2015-12-02, 19, 7.8))
Is there anything can do to improve it? :)
I think your best bet is to map your input data to an RDD of tuples of (user, (date, item1, item2))
so the rdd will be userRdd: RDD[(Int, (Date, Int, Double))]
From here you can create a reducer that will take two tuples and produce another of the same format which is the tuple with the greater date value:
reduceMaxDate(a: (Date, Int, Double), b: (Date, Int, Double)) : (Date, Int, Double) = {
if(a._1 > b._1) a else b
}
From here you can find the max value for each user by calling:
userRdd.reduceByKey(reduceMaxDate).
This will yield the tuple with the max timestamp for each user.
Here are the scripts
For scala
val data = sc.textFile("file:///home/cloudera/data.txt")
val dataMap = data.map(x => (x.split(" +")(0), x))
val dataReduce = dataMap.reduceByKey((x, y) =>
if(x.split(" +")(1) >= y.split(" +")(1)) x
else y)
val dataUserAndDateKey = data.map(rec => ((rec.split(" +")(0), rec.split(" +")(1)), rec))
val dataReduceUserAndDateKey = dataReduce.map(rec => ((rec._2.split(" +")(0), rec._2.split(" +")(1)), rec(1)))
val joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
joinData.map(rec => rec._2._1).foreach(println)
For pyspark
import re
data = sc.textFile("file:///home/cloudera/data.txt")
dataMap = data.map(lambda rec: (re.split('\s+', rec)[0], rec))
dataReduce = dataMap.reduceByKey(lambda x, y: x if(re.split('\s+', x)[1] >= re.split('\s+', y)[1]) else y)
dataUserAndDateKey = data.map(lambda rec: ((re.split('\s+', rec)[0], re.split('\s+', rec)[1]), rec))
dataReduceUserAndDateKey = dataReduce.map(lambda rec: ((re.split('\s+', rec[1])[0], re.split('\s+', rec[1])[1]), rec[1]))
joinData = dataUserAndDateKey.join(dataReduceUserAndDateKey)
for i in joinData.collect(): print(i[1][0])
Here is the output:
3 2015-12-02 13 1.4
3 2015-12-02 50 1.0
3 2015-12-02 19 7.8
2 2015-12-31 20 2.5
1 2015-12-02 8 9.4
1 2015-12-02 90 1.3
You can also use SQL in HiveContext of SparkContext using data frames.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With