Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement NIO Socket (client) using Kotlin coroutines in Java Code?

I want to use Kotlin(v1.3.0) coroutines & java.nio.channels.SocketChannel (NIO) to replace Socket connect (blocking IO) in Android. because this can save many number of threads.

The code below can't run because of job.await() is suspending function in Kotlin, It just can be called in Ktolin coroutines block. like launch{..}, async{..}.

// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {

    // Start a new connection
    // Create a non-blocking socket channel
    val socketChannel = SocketChannel.open()
    socketChannel.configureBlocking(false)

    // async calls NIO connect function
    val job = GlobalScope.async(oneThreadCtx) {
        aConnect(socketChannel, address)
    }

    // I what to suspend(NOT block) current Java Thread, until connect is success
    job.await()

    return socketChannel
}

But, I tried to use runBlocking{..} make this function as normal function in Java. but job.await blocked current Java Thread, Not suspend.

so, how should I implement this function with Kotlin(v1.3.0) coroutines?

like image 575
haha Avatar asked Dec 12 '18 04:12

haha


People also ask

Can I use Kotlin coroutines in Java?

We can consume this API from Kotlin coroutine to load two images and combine then asynchronously. The resulting function returns CompletableFuture<Image> for ease of use back from Java. Note that this module should be used only for integration with existing Java APIs based on CompletableFuture .

What is Kotlin coroutines example?

A coroutine is a concurrency design pattern that you can use on Android to simplify code that executes asynchronously. Coroutines were added to Kotlin in version 1.3 and are based on established concepts from other languages.

How do you communicate between two coroutines?

Channels form the foundational component for communicating between coroutines. A Channel implements both the SendChannel and ReceiveChannel interface.

How do I launch coroutines Kotlin?

launch. The simplest way to create a coroutine is by calling the launch builder on a specified scope. It Launches a new coroutine without blocking the current thread and returns a reference to the coroutine as a Job. The coroutine is canceled when the resulting job is canceled.


2 Answers

As Marko pointed out your code will still end up blocking a thread even when that blocking operation is in the async coroutine. To truly get the asynchronous behavior you desire with Java and Kotlin you need to use the Async version of Socket Channel

With this, you get true asynchronous socket handling. With that class and Kotlin's suspendCoroutine builder method, you can turn the async handlers into suspendable calls.

Here is an example of implementing it for read:

class TcpSocket(private val socket: AsynchronousSocketChannel) {
    suspend fun read(buffer: ByteBuffer): Int {
        return socket.asyncRead(buffer)
    }

    fun close() {
        socket.close()
    }

    private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
        return suspendCoroutine { continuation ->
           this.read(buffer, continuation, ReadCompletionHandler)
        }
    }

    object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) {
            attachment.resume(result)
        }

        override fun failed(exc: Throwable, attachment: Continuation<Int>) {
            attachment.resumeWithException(exc)
        }
    }
}

You could choose to remove the wrapping that I'm doing here and just expose an asyncRead method on AsynchronousSocketChannel like so:

suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
    return suspendCoroutine { continuation ->
       this.read(buffer, continuation, ReadCompletionHandler)
    }
}

object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
    override fun completed(result: Int, attachment: Continuation<Int>) {
        attachment.resume(result)
    }

    override fun failed(exc: Throwable, attachment: Continuation<Int>) {
        attachment.resumeWithException(exc)
    }
}

It's all a matter of taste and what exactly your design goals are. You should be able to implement a similar method for the initial connect as I did here for read.

like image 120
PatTheGamer Avatar answered Oct 10 '22 12:10

PatTheGamer


// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()

This is not a realistic expectation. From the perspective of Java, a suspend fun suspends its execution by returning a special constant, COROUTINE_SUSPENDED. You need the Kotlin compiler to hide that from you and allow you to write regular-looking code that's suspendable.

Your code falls short of non-blocking suspension even from the Kotlin perspective because it uses a blocking call to connect. Submitting that call to another thread doesn't make it non-blocking.

What your code does is completely equivalent to submitting a job to a Java executor service and then awaiting on its result. You can for example use CompletableFuture.supplyAsync.

like image 28
Marko Topolnik Avatar answered Oct 10 '22 11:10

Marko Topolnik