Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Convert String to Array to Objects in Observable

I'm trying to use a CloseableHttpAsyncClient to read from an endpoint, marshall the string to an Object (using javax.json) then convert an array on the object into it's individual components:

CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();

client.start();

Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
            .toObservable();

Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
        String stringVal = new String(bb);
        StringReader reader = new StringReader(stringVal);
        JsonObject jobj = Json.createReader(reader).readObject();
        return jobj.getJsonArray("elements");
    })).share();

I need to get the Json Array, then filter the objects of the array:

Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));

How do I convert the Observable<JsonArray> into a ObservableJsonObject>?

Because it's async, I can't use forEach to create some kind of array to buffer the data.

UPDATE:

So Looking at using the CloseableHttpAsyncClient may not be the best solution for what I'm trying to achieve - I realised this morning (in the shower of all things) that I'm trying to process the data asynchronously to then make async calls.

Ideally, making the call to a CloseableHttpClient (sync) and passing the data to the Observable for filtering would be a more ideal approach (I don't need the first call to manage more than one http call).

    CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();

    StringBuffer result = new StringBuffer();

    try {
        HttpGet request = new HttpGet(uri);
        HttpResponse response = client.execute(request);

        BufferedReader rd = new BufferedReader(
                new InputStreamReader(response.getEntity().getContent()));

        String line;
        while ((line = rd.readLine()) != null) {
            result.append(line);
        }
    } catch(ClientProtocolException cpe) { } catch(IOException ioe) {  }

    StringReader reader = new StringReader(result.toString());
    JsonObject jobj = Json.createReader(reader).readObject();
    JsonArray elements = jobj.getJsonArray("elements");

    List<JsonObject> objects = elements.getValuesAs(JsonObject.class);


    Observable<JsonObject> shareable = Observable.from(objects).share();

    Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
    Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
    Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));


    firstStream.subscribe(record -> {
        //connect to SOTS/Facebook and store the results
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Json.createWriter(baos).writeObject(record);
        System.out.println(baos.toString());
    });

    secondStream.subscribe(record -> {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Json.createWriter(baos).writeObject(record);
        System.out.println(baos.toString());
    });

    thirdStream.subscribe(record -> {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        Json.createWriter(baos).writeObject(record);
        System.out.println(baos.toString());
    });
like image 708
user2266870 Avatar asked Oct 30 '22 23:10

user2266870


1 Answers

Try this code:

    String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}";

    Observable.just(myjson)
            .map(jsonStr -> new StringReader(myjson))
            .map(reader -> Json.createReader(reader).readObject())
            .map(jobj -> jobj.getJsonArray("elements"))
            .map(elements -> elements.toArray(new JsonObject[elements.size()]))
            .flatMap(jsonObjects -> Observable.from(jsonObjects))
            .subscribe(
                    (jsonObject) -> System.out.println(jsonObject.getString("text")),
                    throwable -> throwable.printStackTrace(),
                    () -> System.out.println("On complete"));

Result:

07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj1
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj2
07-22 12:19:22.362 8032-8032/com.mediamanagment.app I/System.out﹕ Obj3

NOTE:
You should use this dependency:

compile 'org.glassfish:javax.json:1.0.4'

Instead this:

compile 'javax.json:javax.json-api:1.0'

If you will use 'javax.json:javax.json-api:1.0' you will get javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found at step:

.map(reader -> Json.createReader(reader).readObject())

Thereof, please, use 'org.glassfish:javax.json:1.0.4'

UPDATE: Also, instead of

.flatMap(jsonObjects -> Observable.from(jsonObjects))

You can use flatMapIterable( ):

.flatMapIterable(jsonObjects -> jsonObjects)
like image 51
Danila Grigorenko Avatar answered Nov 15 '22 04:11

Danila Grigorenko