Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I create a file sending client/server with RSocket?

Tags:

I can't seem to find any resources/tutorials on RSocket, other than just reading their code on GitHub, which I don't understand.

I have a file's path on my server: String serverFilePath;

I'd like to be able to download it from my client (using RSocket's Aeron implementation, preferably). Does anyone know how to do this using RSocket?

Thanks in advance.

like image 792
James Avatar asked Apr 27 '18 11:04

James


1 Answers

I work on RSocket, and wrote a large portion of the java version including the Aeron transport.

I wouldn't recommend using the Aeron implementation currently. There's a couple ways you can send files:

  1. Using a requestChannel to push the data to a remote server.
  2. Use requestChannel or requestStream to stream bytes to a client.

Here's an example using requestStream:

  public class FileCopy {

  public static void main(String... args) throws Exception {

    // Create a socket that receives incoming connections
    RSocketFactory.receive()
        .acceptor(
            new SocketAcceptor() {
              @Override
              // Create a new socket acceptor
              public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                return Mono.just(
                    new AbstractRSocket() {
                      @Override
                      public Flux<Payload> requestStream(Payload payload) {
                        // Get the path of the file to copy
                        String path = payload.getDataUtf8();
                        SeekableByteChannel _channel = null;

                        try {
                          _channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
                        } catch (IOException e) {
                          return Flux.error(e);
                        }

                        ReferenceCountUtil.safeRelease(payload);

                        SeekableByteChannel channel = _channel;
                        // Use Flux.generate to create a publisher that returns file at 1024 bytes
                        // at a time
                        return Flux.generate(
                            sink -> {
                              try {
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                int read = channel.read(buffer);
                                buffer.flip();
                                sink.next(DefaultPayload.create(buffer));

                                if (read == -1) {
                                  channel.close();
                                  sink.complete();
                                }
                              } catch (Throwable t) {
                                sink.error(t);
                              }
                            });
                      }
                    });
              }
            })
        .transport(TcpServerTransport.create(9090))
        .start()
        .subscribe();

    String path = args[0];
    String dest = args[1];

    // Connect to a server
    RSocket client =
        RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();

    File f = new File(dest);
    f.createNewFile();

    // Open a channel to a new file
    SeekableByteChannel channel =
        Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);

    // Request a stream of bytes
    client
        .requestStream(DefaultPayload.create(path))
        .doOnNext(
            payload -> {
              try {
                // Write the bytes received to the new file
                ByteBuffer data = payload.getData();
                channel.write(data);

                // Release the payload
                ReferenceCountUtil.safeRelease(payload);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
            })
        // Block until all the bytes are received
        .blockLast();

    // Close the file you're writing too
    channel.close();
  }
}
like image 158
Robert Roeser Avatar answered Sep 28 '22 17:09

Robert Roeser