Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

REST Streaming JSON Output

We have JAX RS implementation which needs to send back JSON output. But the response size is huge. And the client expects the same synchronously. Hence I tried to use StreamingOutput... but the client is not really getting the data in chunks. Below is sample snippet:

Server Side

streamingOutput = new StreamingOutput() {

    @Override
    public void write(OutputStream out) throws IOException, WebApplicationException {
        JsonGenerator jsonGenerator = mapper.getFactory().createGenerator(out);
        jsonGenerator.writeStartArray();
        for(int i=0; i < 10; i++) {
            jsonGenerator.writeStartObject();

            jsonGenerator.writeStringField("Response_State", "Response State - " + i);
            jsonGenerator.writeStringField("Response_Report", "Response Report - " + i);
            jsonGenerator.writeStringField("Error_details", "Error Details - " + i);

            jsonGenerator.writeEndObject();;
            jsonGenerator.flush();

            try {
                Thread.currentThread().sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        jsonGenerator.writeEndArray();

        jsonGenerator.close();

    }
};

return Response.status(200).entity(streamingOutput).build();

Client

HttpClient client = HttpClientBuilder.create().build();
HttpPost post = new HttpPost("http://localhost:8080/AccessData/FetchReport");
post.setHeader("Content-type", "application/json");
ResponseHandler<HttpResponse> responseHandler = new BasicResponseHandler();
StringEntity entity = new StringEntity(jsonRequest); //jsonRequest is       request string
post.setEntity(entity);
HttpResponse response = client.execute(post);
BufferedReader buffReader = new BufferedReader(new      InputStreamReader(response.getEntity().getContent()));
JsonParser jsonParser = new JsonFactory().createParser(buffReader);
while(jsonParser.nextToken() != JsonToken.END_OBJECT) {
    System.out.println(jsonParser.getCurrentName() + ":" +      jsonParser.getCurrentValue());
}
String output;
while((output = buffReader.readLine()) != null) {
    System.out.println(output);
}

In the server side code, I am putting sleep call just to simulate a gap between chunks of data. What I need is that the client should receive chunks of data as and when it is thrown back by the server. But here the client gets the response in entirety always. Any possible solution?

Thanks in advance.

like image 419
Soumen Ghosh Avatar asked Aug 11 '17 12:08

Soumen Ghosh


People also ask

Can you stream JSON?

Concatenated JSON streaming allows the sender to simply write each JSON object into the stream with no delimiters. It relies on the receiver using a parser that can recognize and emit each JSON object as the terminating character is parsed.

What is application stream JSON?

application/stream+json is for server to server/http client (anything that's not a browser) communications. It won't prefix the data and will just use CRLF to split the pieces of data.


1 Answers

It looks like the client side is not implemented correctly: reading the array of the objects using the parser.

Also, I would like to recommend reading and writing a data transfer object instead of low level field-by-field reading and writing.

For the sake of completeness, here is a complete draft example that uses: Jersey 2.25.1, Jetty 9.2.14.v20151106.

Common

ResponseData class

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

public class ResponseData {
    private final String responseState;
    private final String responseReport;
    private final String errorDetails;

    @JsonCreator
    public ResponseData(
        @JsonProperty("Response_State") final String responseState,
        @JsonProperty("Response_Report") final String responseReport,
        @JsonProperty("Error_details") final String errorDetails) {
        this.responseState = responseState;
        this.responseReport = responseReport;
        this.errorDetails = errorDetails;
    }

    public String getResponseState() {
        return this.responseState;
    }

    public String getResponseReport() {
        return this.responseReport;
    }

    public String getErrorDetails() {
        return this.errorDetails;
    }

    @Override
    public String toString() {
        return String.format(
            "ResponseData: responseState: %s; responseReport: %s; errorDetails: %s",
            this.responseState,
            this.responseReport,
            this.errorDetails
        );
    }
}

Service

ServerProgram class

import java.net.URI;
import org.glassfish.jersey.jackson.JacksonFeature;
import org.glassfish.jersey.jetty.JettyHttpContainerFactory;
import org.glassfish.jersey.server.ResourceConfig;

public class ServerProgram {
    public static void main(final String[] args) {
        final URI uri = URI.create("http://localhost:8080/");
        final ResourceConfig resourceConfig = new ResourceConfig(TestResource.class);
        resourceConfig.register(JacksonFeature.class);
        JettyHttpContainerFactory.createServer(uri, resourceConfig);
    }
}

TestResource class

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;

@Path("/")
public class TestResource {
    @GET
    @Produces(MediaType.APPLICATION_JSON)
    public Response getData() {
        final StreamingOutput streamingOutput = new JsonStreamingOutput();
        return Response.status(200).entity(streamingOutput).build();
    }

    private static class JsonStreamingOutput implements StreamingOutput {
        @Override
        public void write(final OutputStream outputStream) throws IOException, WebApplicationException {
            final ObjectMapper objectMapper = new ObjectMapper();
            final JsonFactory jsonFactory = objectMapper.getFactory();
            try (final JsonGenerator jsonGenerator = jsonFactory.createGenerator(outputStream)) {
                jsonGenerator.writeStartArray();

                for (int i = 0; i < 10; i++) {
                    final ResponseData responseData = new ResponseData(
                        "Response State - " + i,
                        "Response Report - " + i,
                        "Error Details - " + i
                    );
                    jsonGenerator.writeObject(responseData);
                    jsonGenerator.flush();

                    try {
                        Thread.currentThread().sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                jsonGenerator.writeEndArray();
            }
        }
    }
}

Client

ClientProgram class

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import org.glassfish.jersey.client.ClientProperties;

public class ClientProgram {
    public static void main(final String[] args) throws IOException {
        Client client = null;
        try {
            client = ClientBuilder.newClient();
            client.property(ClientProperties.READ_TIMEOUT, 10000);

            try (final InputStream inputStream = client
                .target("http://localhost:8080/")
                .request(MediaType.APPLICATION_JSON)
                .get(InputStream.class);
                final BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream)) {
                processStream(bufferedInputStream);
            }
        } finally {
            if (client != null) {
                client.close();
            }
        }
    }

    private static void processStream(final InputStream inputStream) throws IOException {
        final ObjectMapper objectMapper = new ObjectMapper();
        final JsonFactory jsonFactory = objectMapper.getFactory();
        try (final JsonParser jsonParser = jsonFactory.createParser(inputStream)) {
            final JsonToken arrayToken = jsonParser.nextToken();
            if (arrayToken == null) {
                // TODO: Return or throw exception.
                return;
            }

            if (!JsonToken.START_ARRAY.equals(arrayToken)) {
                // TODO: Return or throw exception.
                return;
            }

            // Iterate through the objects of the array.
            while (JsonToken.START_OBJECT.equals(jsonParser.nextToken())) {
                final ResponseData responseData = jsonParser.readValueAs(ResponseData.class);
                System.out.println(responseData);
            }
        }
    }
}

Hope this helps.

like image 88
Sergey Vyacheslavovich Brunov Avatar answered Sep 20 '22 18:09

Sergey Vyacheslavovich Brunov