Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to group by and return list in rx java

Tags:

rx-java

I'm trying to create group list based on certain conditions in Rxjava.

Below is my response:

{  
   "dates":[  
      {  
         "date":18,
         "value":"1"
      },
      {  
         "date":18,
         "value":"2"
      },
      {  
         "date":18,
         "value":"3"
      },
      {  
         "date":19,
         "value":"1"
      },
      {  
         "date":19,
         "value":"2"
      },
      {  
         "date":19,
         "value":"3"
      },
      {  
         "date":19,
         "value":"4"
      }
   ]
}

How can group by the values 18 [value 1, value 2, value 3, highestvalue= 3, lowestvalue = 1] 19[value 1, value 2, value 3, value 4,highestvalue= 4, lowestvalue = 1] using Rxjava

Note: I can create using for loop but the response will be fetched from the server and since it is returning observable thought of using rx java functionality.

Anyhelp will be really appreciated.

Thank you, Shanthi

like image 233
Shanthi Raman Avatar asked Sep 19 '17 03:09

Shanthi Raman


Video Answer


2 Answers

Look into group by functionality.

Here's the example for anyone who's curious:

class DateModel implements Comparable<DateModel>{
    Integer date;
    Integer value;

    public DateModel(int date, int value){
        this.date = date; 
        this.value = value;
    }

    @Override
    public int compareTo(DateModel o) {
        return value.compareTo(o.value);
    }
}

And then if we have to aggregate a list of these model objects:

// example list
List<DateModel> dateList = Arrays.asList(
  new DateModel(18,1),
  new DateModel(18,2),
  new DateModel(18,3),
  new DateModel(19,1),
  new DateModel(19,2),
  new DateModel(19,3),
  new DateModel(19,4)
);

// the following observable will give you an emission for every grouping
// for the example data above, you should get two emissions (group 18 and 19)
Observable<PriorityQueue<DateModel>> observable = 
  Observable.from(dateList)
    .groupBy(dateModel -> dateModel.date)
    .flatMap(groups -> groups.collect(PriorityQueue::new, PriorityQueue::add));

PriorityQueue was just an example of the structure used for collecting. If you pop from queue, you'll get 18-1, 18-2, 18-3 etc (in the order you asked). You can use a different structure for the purposes of only finding the max & min.

like image 186
Jon Avatar answered Oct 11 '22 00:10

Jon


Inspired by @Jon 's answer which works. Here's a full demo code for Rxjava2 and output.

  • Observable#collect() for Observable
  • Flowable#parallel() + Single#blockingGet() for Flowable

The output :

----------------------byCollect
[2017/11/16 20:42:43.548 CST][   1 -                           main] - flatMapSingle :  : 1
[2017/11/16 20:42:43.590 CST][   1 -                           main] - flatMapSingle :  : 2
[2017/11/16 20:42:43.591 CST][   1 -                           main] - flatMapSingle :  : 0
[2017/11/16 20:42:43.592 CST][   1 -                           main] - subscribe : onNext : {0=[3, 6, 9]}
[2017/11/16 20:42:43.593 CST][   1 -                           main] - subscribe : onNext : {1=[1, 4, 7]}
[2017/11/16 20:42:43.593 CST][   1 -                           main] - subscribe : onNext : {2=[2, 5, 8]}
[2017/11/16 20:42:43.597 CST][   1 -                           main] - subscribe : onComplete : 
----------------------byParallelAndBlockingGet
[2017/11/16 20:42:43.629 CST][  13 -      RxComputationThreadPool-1] - flatMap :  : 1
[2017/11/16 20:42:43.629 CST][  15 -      RxComputationThreadPool-3] - flatMap :  : 0
[2017/11/16 20:42:43.629 CST][  14 -      RxComputationThreadPool-2] - flatMap :  : 2
[2017/11/16 20:42:43.632 CST][  15 -      RxComputationThreadPool-3] - subscribe : onNext : {0=[3, 6, 9]}
[2017/11/16 20:42:43.632 CST][  15 -      RxComputationThreadPool-3] - subscribe : onNext : {1=[1, 4, 7]}
[2017/11/16 20:42:43.633 CST][  15 -      RxComputationThreadPool-3] - subscribe : onNext : {2=[2, 5, 8]}
[2017/11/16 20:42:43.633 CST][  15 -      RxComputationThreadPool-3] - subscribe : onComplete : 

The source : Demo.java

import io.reactivex.*;
import io.reactivex.Observable;
import io.reactivex.schedulers.*;

import java.time.*;
import java.time.format.*;
import java.util.*;

/**
 *   List<Integer>              // [1..9]
 *   ->
 *   Map<Integer,List<Integer> //  {0: [3,6,9],  1: [1,4,7],  2: [2,5,8] }
 */
public class Demo {
    public static void main(String[] args) throws InterruptedException {
        byCollect();
        byParallelAndBlockingGet();
    }

    public static void byCollect() throws InterruptedException {
        System.out.println("----------------------byCollect");
        Observable.range(1, 9)
                .groupBy(i -> i % 3)
                .flatMapSingle(f -> {  // GroupedObservable<Integer, List<Integer>>

                    // Look output : all runs on same thread,
                    print("flatMapSingle : ", f.getKey());

                    // "onComplete" has not been triggered.
                    // blockingGet will block current thread.
                    //return Observable.just(Collections.singletonMap(f.getKey(), f.toList().blockingGet()))

                    return f.collect(
                            // (Callable<Map<Integer, List<Integer>>>)
                            () -> Collections.singletonMap(f.getKey(), new ArrayList<Integer>()),

                            // (BiConsumer<Map<Integer, List<Integer>>, Integer>)
                            (m, i) -> m.get(f.getKey()).add(i)
                    );

                })
                .subscribe(
                        i -> print("subscribe : onNext", i),
                        err -> print("subscribe : onError", err),
                        () -> print("subscribe : onComplete", "")
                )
        ;
    }

    public static void byParallelAndBlockingGet() throws InterruptedException {
        System.out.println("----------------------byParallelAndBlockingGet");
        Flowable.range(1, 9)
                .groupBy(i -> i % 3)
                .parallel()  // There's no `parallel` method on `Observable` class
                .runOn(Schedulers.computation())  // Important!!!
                .flatMap(f -> { // ParallelFlowable<GroupedFlowable<Integer, List<Integer>>
                    // Look output : runs on different thread each.
                    print("flatMap : ", f.getKey());
                    return Flowable.just(Collections.singletonMap(f.getKey(), f.toList().blockingGet()));
                })
                .sequential()
                .subscribe(
                        i -> print("subscribe : onNext", i),
                        err -> print("subscribe : onError", err),
                        () -> print("subscribe : onComplete", "")
                )
        ;
        Thread.sleep(500);
    }

    public static void print(String step, Object data) {
        ZonedDateTime zdt = ZonedDateTime.now();
        String now = zdt.format(DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss.SSS z"));
        System.out.printf("[%s][%4d - %30s] - %10s : %s%n",
                now,
                Thread.currentThread().getId(),
                Thread.currentThread().getName(),
                step,
                data
        );
    }
}
like image 44
btpka3 Avatar answered Oct 10 '22 23:10

btpka3