Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Sending a stream of documents to a Jersey @POST endpoint

I want to be able to send a stream of a bunch of documents to a web service. This will save on Http request/response overhead and focus on the documents themselves.

In python you can do something like this:

r = requests.post('https://stream.twitter.com/1/statuses/filter.json',
    data={'track': 'requests'}, auth=('username', 'password'),
    stream=True)

for line in r.iter_lines():
    if line: # filter out keep-alive new lines
        print json.loads(line)

I'm looking for an example of someone streaming a Request to a Jersey rest api. I was hoping to see the client side and the server side to show it working. But i'm struggling hard to find an example out there.

The example Ideally would show:

Client:
  Open request
  Iterate over huge document list
    Write document to open request stream
  Close request

Server:
  @POST method
    Open entity stream
    Iterate over entity stream while next document is available
        Process document
    Close entity stream              

If we get it right you'll be processing entities on the Server while still sending them on the Client! Huge win!

like image 303
Nicholas DiPiazza Avatar asked Jul 27 '16 12:07

Nicholas DiPiazza


1 Answers

One of the simplest ways to accomplish this is to let Jersey provide the POST handler with an InputStream for the HTTP POST body. The method can use the InputStream and JSON parser of your choice to parse then handle each object.

In the following example, the a Jackson ObjectReader produces a MappingIterator which parses and processes each Person document in the array as it is delivered to the server

/**
 * Parse and process an arbitrarily large JSON array of Person documents
 */
@Path("persons")
public static class PersonResource {

    private static final ObjectReader reader = new ObjectMapper().readerFor(Person.class);

    @Path("inputstream")
    @Consumes("application/json")
    @POST
    public void inputstream(final InputStream is) throws IOException {
        final MappingIterator<Person> persons = reader.readValues(is);
        while (persons.hasNext()) {
            final Person person = persons.next();
            // process
            System.out.println(person);
        }
    }
}

Likewise, the Jersey client framework can send a stream of documents when configured with a Jackson ObjectMapper. The following example demonstrates this with the Jersey Test framework. The client streams an arbitrarily large iterator of Person documents

public class JacksonStreamingTest extends JerseyTest {

    @Override
    protected Application configure() {
        return new ResourceConfig(PersonResource.class, ObjectMapperProvider.class);
    }

    /**
     * Registers the application {@link ObjectMapper} as the JAX-RS provider for application/json
     */
    @Provider
    @Produces(MediaType.APPLICATION_JSON)
    public static class ObjectMapperProvider implements ContextResolver<ObjectMapper> {

        private static final ObjectMapper mapper = new ObjectMapper();

        public ObjectMapper getContext(final Class<?> objectType) {
            return mapper;
        }
    }

    @Override
    protected void configureClient(final ClientConfig config) {
        config.register(ObjectMapperProvider.class);
    }

    @Test
    public void test() {
        final Set<Person> persons = Collections.singleton(Person.of("Tracy", "Jordan"));
        final Response response = target("persons/inputstream").request().post(Entity.json(persons.iterator()));
        assertThat(response.getStatus()).isEqualTo(204);
    }
}
like image 161
jdgilday Avatar answered Oct 14 '22 21:10

jdgilday