Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Retrofit2 and streaming serialized objects

Tags:

gson

retrofit2

I am using Retrofit2. So far I tried it to send and receive java.util.List<MyCustomClass> and it is working really well. I am however wondering what would happen with really big lists. Is there way to make more "streaming" interface?

For example I could parse to api Iterator<MyCustomClass> instead of List and create each next instance on fly to save memory when sending data to API. Same Iterator<MyCustomClass> be used for the opposite direction.

Is there a way to achieve that?

I can see that Gson is able to do streaming serialization and deserialization. Can Retrofit utilize that?

Edit: Clarified question a bit.

like image 273
K.H. Avatar asked Apr 22 '26 01:04

K.H.


1 Answers

An interesting question. First of all, you have to take into account that List and Iterable differ semantically, especially in the scope of I/O. If you look at it from the I/O perspective, lists are used to collect elements into an in-memory object and then close the underlying resource immediately. Iterables are not introduced to Gson, I think, because of being too lazy and deferring the elements evaluation from input streams: iterables should be able to return new iterators by design. It's not possible to create a new iterator unless another I/O resource reading is requested. This is where the confusion comes. I recently solved a very similar problem for Spring Framework (the MVC module) and Java 8 streams, and I seem to have a solution for Retrofit too. However, Retrofit needs to be used very carefully in order to be used properly and protect from resource leaks. Note that you can't use iterables here, but you can use iterators (Iterator): they are semantically very similar to Java 8 streams (Stream), and Java I/O streams (InputStream and OutputStream), because they cannot be reused. However, knowing the difference you still can work with Iterator instances.

Let's create a simple static HTTP web server in Python if you want to experiment more:

from BaseHTTPServer import BaseHTTPRequestHandler,HTTPServer

PORT_NUMBER = 8080

class myHandler(BaseHTTPRequestHandler):

    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type','text/json')
        self.end_headers()
        # Send the html message
        self.wfile.write('["foo","bar","baz"]')
        return

try:
    server = HTTPServer(('', PORT_NUMBER), myHandler)
    print 'Started httpserver on port ' , PORT_NUMBER
    server.serve_forever()
except KeyboardInterrupt:
    print '^C received, shutting down the web server'
    server.socket.close()

This web service always returns ["foo","bar","baz"] no taking requests into account. Now create a sample service to work with it:

interface IService {

    @GET("/")
    Call<Iterator<String>> get();

}

Second, Gson does not work with iterables or iterators by default, so a custom (de)serialization module needs to be implemented. The type adapter factory below either creates a special type adapter for iterators, or lets Gson to choose another previously strategy if any exists:

final class IteratorTypeAdapterFactory
        implements TypeAdapterFactory {

    private static final TypeAdapterFactory iteratorTypeAdapterFactory = new IteratorTypeAdapterFactory();

    private IteratorTypeAdapterFactory() {
    }

    static TypeAdapterFactory getIteratorTypeAdapterFactory() {
        return iteratorTypeAdapterFactory;
    }

    @Override
    public <T> TypeAdapter<T> create(final Gson gson, final TypeToken<T> type) {
        if ( Iterator.class.isAssignableFrom(type.getRawType()) ) {
            @SuppressWarnings("unchecked")
            final TypeAdapter<T> castTypeAdapter = (TypeAdapter<T>) getIteratorTypeAdapter(getIteratorParameterType(type.getType()), gson);
            return castTypeAdapter;
        }
        return null;
    }

}

Next, the iterator type adapter implementation is as follows. Note that it's very easy to write iterator to a stream, but reading is much more complex:

final class IteratorTypeAdapter<T>
        extends TypeAdapter<Iterator<T>> {

    private final Type elementType;
    private final Gson gson;

    private IteratorTypeAdapter(final Type elementType, final Gson gson) {
        this.elementType = elementType;
        this.gson = gson;
    }

    static <T> IteratorTypeAdapter<Iterator<T>> getIteratorTypeAdapter(final Type elementType, final Gson gson) {
        return new IteratorTypeAdapter<>(elementType, gson);
    }

    @Override
    @SuppressWarnings("resource")
    public void write(final JsonWriter out, final Iterator<T> iterator)
            throws IOException {
        out.beginArray();
        while ( iterator.hasNext() ) {
            final T next = iterator.next();
            gson.toJson(next, elementType, out);
        }
        out.endArray();
    }

    @Override
    public Iterator<T> read(final JsonReader in) {
        return getJsonReaderIterator(elementType, gson, in);
    }

}

And the reading iterator:

final class JsonReaderIterator<T>
        implements Iterator<T>, Closeable {

    private final Type elementType;
    private final Gson gson;
    private final JsonReader in;

    private ReadingIteratorState state = ReadingIteratorState.BEFORE_ARRAY;

    private JsonReaderIterator(final Type elementType, final Gson gson, final JsonReader in) {
        this.elementType = elementType;
        this.gson = gson;
        this.in = in;
    }

    static <T> Iterator<T> getJsonReaderIterator(final Type elementType, final Gson gson, final JsonReader in) {
        return new JsonReaderIterator<>(elementType, gson, in);
    }

    @Override
    public boolean hasNext() {
        try {
            if ( state == ReadingIteratorState.END_OF_STREAM ) {
                return false;
            }
            final boolean hasNext;
            loop:
            for ( ; ; ) {
                switch ( state ) {
                case BEFORE_ARRAY:
                    if ( in.peek() == BEGIN_ARRAY ) {
                        in.beginArray();
                        state = ReadingIteratorState.WITHIN_ARRAY;
                    }
                    continue;
                case WITHIN_ARRAY:
                    if ( in.peek() == END_ARRAY ) {
                        in.endArray();
                        state = ReadingIteratorState.END_OF_STREAM;
                        continue;
                    }
                    hasNext = true;
                    break loop;
                case AFTER_ARRAY:
                    hasNext = false;
                    state = ReadingIteratorState.END_OF_STREAM;
                    break loop;
                case END_OF_STREAM:
                    hasNext = false;
                    break loop;
                default:
                    throw new AssertionError(state);
                }
            }
            return hasNext;
        } catch ( final IOException ex ) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public T next() {
        try {
            if ( !in.hasNext() || state == ReadingIteratorState.END_OF_STREAM ) {
                throw new NoSuchElementException();
            }
            final T element;
            loop:
            for ( ; ; ) {
                switch ( state ) {
                case BEFORE_ARRAY:
                    in.beginArray();
                    state = ReadingIteratorState.WITHIN_ARRAY;
                    if ( in.peek() == END_ARRAY ) {
                        state = ReadingIteratorState.END_OF_STREAM;
                    }
                    break;
                case WITHIN_ARRAY:
                    element = gson.fromJson(in, elementType);
                    if ( in.peek() == END_ARRAY ) {
                        state = ReadingIteratorState.AFTER_ARRAY;
                    }
                    break loop;
                case AFTER_ARRAY:
                    in.endArray();
                    state = ReadingIteratorState.END_OF_STREAM;
                    break;
                case END_OF_STREAM:
                    throw new NoSuchElementException(String.valueOf(state));
                default:
                    throw new AssertionError(state);
                }
            }
            return element;
        } catch ( final IOException ex ) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void close()
            throws IOException {
        in.close();
    }

    private enum ReadingIteratorState {

        BEFORE_ARRAY,
        WITHIN_ARRAY,
        AFTER_ARRAY,
        END_OF_STREAM

    }

}

Note that the iterator must be closeable in order to close underlying resources. To be honest, I don't like the contract of the close method in Java that requires the method to close underlying resources, and I believe that closing resources is a responsibility of an object that opened the resources. For example, such an iterator does not need to be closed in Spring MVC because the framework listens to HTTP requests itself, lets your handlers process the requests, and then closes the requests itself. In Retrofit resource leaks can happen if exposing iterators, so this is why this implementation has the close method implementation.

Next thing: configuring Retrofit to work with Iterator-aware Gson instances. There is a converter factory below that can work with lazy iterators. Note that the default GsonConverterFactory implementation does not let to work with iterators since it closes input streams before one can process them and convert to iterators. The rest two methods can be re-used from the default implementation, but it's not necessary to pass it from outside and can be instantiated privately. Also note that the response converters of the factory work for top-most objects only and there is no need to close iterators that can be fields of another objects.

final class CustomGsonConverterFactory
        extends Factory {

    private final Gson gson;
    private final GsonConverterFactory gsonConverterFactory;

    private CustomGsonConverterFactory(final Gson gson, final GsonConverterFactory gsonConverterFactory) {
        this.gson = gson;
        this.gsonConverterFactory = gsonConverterFactory;
    }

    static Factory getCustomGsonConverterFactory(final Gson gson, final GsonConverterFactory gsonConverterFactory) {
        return new CustomGsonConverterFactory(gson, gsonConverterFactory);
    }

    @Override
    public Converter<ResponseBody, ?> responseBodyConverter(final Type type, final Annotation[] annotations, final Retrofit retrofit) {
        final TypeAdapter<?> adapter = gson.getAdapter(TypeToken.get(type));
        final boolean isClosedElsewhere = isIterator(type);
        return (Converter<ResponseBody, Object>) responseBody -> {
            try {
                return adapter.read(gson.newJsonReader(responseBody.charStream()));
            } finally {
                if ( !isClosedElsewhere ) {
                    responseBody.close();
                }
            }
        };
    }

    @Override
    public Converter<?, RequestBody> requestBodyConverter(final Type type, final Annotation[] parameterAnnotations, final Annotation[] methodAnnotations,
            final Retrofit retrofit) {
        return gsonConverterFactory.requestBodyConverter(type, parameterAnnotations, methodAnnotations, retrofit);
    }

    @Override
    public Converter<?, String> stringConverter(final Type type, final Annotation[] annotations, final Retrofit retrofit) {
        return gsonConverterFactory.stringConverter(type, annotations, retrofit);
    }

}

The code above uses some reflection utilities:

final class Reflection {

    private Reflection() {
    }

    static Type getIteratorParameterType(final Type type)
            throws IllegalArgumentException {
        return getTParameterType(type, Iterator.class);
    }

    static boolean isIterator(final Type type) {
        return Iterator.class.equals(type)
                || type instanceof ParameterizedType && Iterator.class.equals(((ParameterizedType) type).getRawType());
    }

    private static Type getTParameterType(final Type type, final Type expectedParameterizedType)
            throws IllegalArgumentException {
        if ( expectedParameterizedType.equals(type) ) {
            return expectedParameterizedType;
        }
        if ( type instanceof ParameterizedType ) {
            final ParameterizedType parameterizedType = (ParameterizedType) type;
            if ( expectedParameterizedType.equals(parameterizedType.getRawType()) ) {
                final Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                if ( actualTypeArguments.length == 1 ) {
                    return actualTypeArguments[0];
                }
            }
        }
        throw new IllegalArgumentException(String.valueOf(type));
    }

}

And code below will use the following closeable iterator utilities in order to close I/O resources (remember the converter factory that creates a converter that does not close resources for iterators?).

final class CloseableIterators {

    private CloseableIterators() {
    }

    static <T> void forEachAndClose(final Iterator<? extends T> iterator, final Consumer<? super T> consumer)
            throws Exception {
        try {
            while ( iterator.hasNext() ) {
                consumer.accept(iterator.next());
            }
        } finally {
            tryClose(iterator);
        }
    }

    static <T> List<T> collectToListAndClose(final Iterator<? extends T> iterator)
            throws Exception {
        final List<T> list = new ArrayList<>();
        forEachAndClose(iterator, list::add);
        return unmodifiableList(list);
    }

    static void tryClose(final Object object)
            throws Exception {
        if ( object instanceof AutoCloseable ) {
            ((AutoCloseable) object).close();
        }
    }

}

As you can see, all of the methods above close try to close given iterators to free I/O resources. And here is how all of that works:

final Gson gson = new GsonBuilder()
        .registerTypeAdapterFactory(getIteratorTypeAdapterFactory())
        .create();
final Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("http://localhost")
        .addConverterFactory(getCustomGsonConverterFactory(gson))
        .build();
final IService service = retrofit.create(IService.class);
final Call<Iterator<String>> getCall = service.get();
getCall.enqueue(new Callback<Iterator<String>>() {
    @Override
    public void onResponse(final Call<Iterator<String>> call, final Response<Iterator<String>> response) {
        try {
            final Iterator<String> iterator = response.body();
            if ( ... ) {
                forEachAndClose(iterator, out::println);
            } else if ( ... ) {
                out.println(collectToListAndClose(iterator));
            } else {
                tryClose(iterator);
            }
        } catch ( final Exception ex ) {
            throw new RuntimeException(ex);
        }
    }

    @Override
    public void onFailure(final Call<Iterator<String>> call, final Throwable throwable) {
        throwable.printStackTrace(err);
    }
});

Note that onResponse must close the top-most iterator objects itself, or a particular request resource will be leaked: even if there is no interaction expected, tryClose must be invoked in order to close a JsonReaderIterator instance. But, according to how iterators work, you can use them once.

Edit: closeable iterators enhancement

Here you have I think at least 2 ways to make them more robust. Consider an iterator that can be closed

interface IAutoCloseableIterator<E>
        implements Iterator<E>, AutoCloseable {
}

This interface can be used for the JsonReaderIterator above. And then you can:

  • either return its instances an via callbacks like Call<IAutoCloseableIterator<E>>;
  • or wrap any iterator in order to detect if it's AutoCloseable:
static <E, I extends Iterator<E> & AutoCloseable> I asAutoCloseable(final Iterator<E> iterator) {
    final Iterator<E> resultIterator;
    if ( iterator instanceof AutoCloseable ) {
        resultIterator = iterator;
    } else {
        resultIterator = new IAutoCloseableIterator<E>() {
            @Override
            public boolean hasNext() {
                return iterator.hasNext();
            }

            @Override
            public E next() {
                return iterator.next();
            }

            @Override
            public void close() {
                // do nothing or whatever elsewhere if there is no way to implement the method
            }

        };
    }
    @SuppressWarnings("unchecked")
    final I castIterator = (I) resultIterator;
    return castIterator;
}

And then (for the first option):

try ( final IAutoCloseableIterator<String> iterator = response.body() ) {
    while ( iterator.hasNext() ) {
        out.println(iterator.next());
    }
}

Or (for the 2nd option; may be not that robust...):

try ( final IAutoCloseableIterator<String> iterator = asAutoCloseable(response.body()) ) {
    while ( iterator.hasNext() ) {
        out.println(iterator.next());
    }
}

try-with-resources is your friend here.

like image 78
Lyubomyr Shaydariv Avatar answered Apr 25 '26 22:04

Lyubomyr Shaydariv