Blog header with title "Creating Dynamic Proxies with Kotlin Coroutines" by Stuart Williamson, Senior Software Engineer. The background is a light gray, the text is navy and there are royal blue, stylized lines of code in a clump on the right side of the graphic. On top of the code, there is a white circle with Kotlin’s logo inside, a stylized K made of multicolored gradient shapes.

Creating Dynamic Proxies with Kotlin Coroutines

Most experienced developers are familiar with using dynamic proxies. They enable us to implement an interface at runtime and decide how to execute methods dynamically as they are invoked. This is very useful to add additional functionality around existing implementations (especially from third-party libraries) as described by the Decorator Pattern. However, Kotlin coroutines introduce a new set of problems when creating dynamic proxies. We’ll explore how to properly detect and dynamically invoke suspend functions declared on an interface.

The Setup

First, create a simple Kotlin console app with Gradle and add the following dependency.

implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3")

For simplicity, we’ll just throw everything in the main.kt file, starting with the ITest interface that we’ll proxy. Notice the interface has a mix of regular and suspend functions with some that throw exceptions. Even though there are no checked exceptions in Kotlin, we need to mark the methods that throw so that the JVM won’t produce an UndeclaredThrowableException whenever our invocation handler propagates an exception.

interface ITest {
    fun test(): String?
    @Throws(Exception::class)
    fun testException(): String?
    suspend fun testSuspend(): String?
    @Throws(Exception::class)
    suspend fun testExceptionSuspend(): String?
}

We’ll follow that up with a simple underlying implementation of the above ITest interface.

class TestImpl: ITest {
    override fun test(): String? {
        return "test result"
    }

    @Throws(Exception::class)
    override fun testException(): String? {
        throw Exception("You called testException().")
    }

    override suspend fun testSuspend(): String? {
        delay(1000)
        return "testSuspend result"
    }

    @Throws(Exception::class)
    override suspend fun testExceptionSuspend(): String? {
        delay(1000)
        throw Exception("You called testExceptionSuspend().")
    }
}

The Goal

Let’s suppose we want to decorate the TestImpl object with some exception handling. For our purposes, we’ll define exception handling as catching and printing out any exception before wrapping and rethrowing it as our own WrappedTestException. So let’s define that in our main.kt file.

class WrappedTestException(cause: Throwable): Exception(cause)

The Naive Approach

If you are used to doing this sort of thing in Java, it is easy to forget about coroutines. Just as easy to not take suspend functions into account when creating a dynamic proxy in Kotlin. Let’s see what happens if we naively do this. We’ll create the generic InvocationHandler below in main.kt. This class delegates invocations to whatever generic instance is passed in. It catches any InvocationTargetException, prints the underlying exception thrown by our delegate instance and then rethrows it wrapped in our WrappedTestException.

/**
 * File: main.kt
 *
 * Dynamic proxy that naively catches and prints out exceptions and re-throws them wrapped as WrappedTestExceptions.
 */
class NaiveExceptionLogger<T>(private val instance: T): InvocationHandler {
    override fun invoke(proxy: Any?, method: Method?, args: Array<out Any>?): Any? {
        try {
            val nonNullArgs = args ?: arrayOf()
            return method?.invoke(instance, *nonNullArgs)
        } catch(e: InvocationTargetException) {
            e.targetException?.let{ targetException ->
                println("Naively caught underlying target exception $targetException")
                throw WrappedTestException(targetException)
            } ?: throw WrappedTestException(e)
        }
    }
}

Now, let’s create and run it with the following TestFactory and main method. We need to execute inside runBlocking since we are firing off suspend functions from the synchronous main method. What do you think will happen? (Spoiler: it’s gonna fail, but how and why?)

object TestFactory {
    fun createNaively(): ITest {
        return Proxy.newProxyInstance(
            ITest::class.java.classLoader,
            arrayOf<Class<*>>(ITest::class.java),
            NaiveExceptionLogger(TestImpl())) as ITest
    }
}


fun main(args: Array<String>) {
    runBlocking {
        val test = TestFactory.createNaively()
        println(test.test())
        println(test.testSuspend())
        try {
            test.testException()
            throw IllegalStateException("Did not catch testException()")
        } catch(e: WrappedTestException) { }
        try {
            test.testExceptionSuspend()
            throw IllegalStateException("Did not catch testExceptionSuspend()")
        } catch(e: WrappedTestException) { }
    }
}

When you run it, you should get the following output:

test result
testSuspend result
Naively caught underlying target exception java.lang.Exception: You called testException().
Exception in thread "main" java.lang.Exception: You called testExceptionSuspend().
    at TestImpl.testExceptionSuspend(main.kt:39)
    at TestImpl$testExceptionSuspend$1.invokeSuspend(main.kt)
    at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
    at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
    at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
    at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:369)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:403)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:395)
    at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:491)
    at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:489)
    at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
    at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
    at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
    at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
    at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
    at MainKt.main(main.kt:126)

Process finished with exit code 1

Looking at the stack trace, it is clear that the exception thrown by the testExceptionSuspend method didn’t get caught, logged, and wrapped. Why, you ask? Well, because technically that method doesn’t “throw” an exception in the traditional sense. Since it is a suspend function, it actually returns a Continuation that resumes later with either a result or an exception. It can be helpful to think of a Continuation as being somewhat similar to a Promise in JavaScript. Our NaiveExceptionLogger is out of the picture as soon as that Continuation is returned, long before it is resumed with the actual result. From its perspective, the call completed successfully without an exception and a value was returned; end of story.

A Correct Approach

If we want to properly capture the exception from the testExceptionSuspend method, in addition to intercepting the method invocation, we must also intercept the result when the Continuation is resumed. To do that, we have to get our hands on this Continuation, but where does that come from? Kotlin compiles them in a “secret” last argument in the method signature of all functions marked suspend. In addition to the eventual result, these Continuation objects also carry the CoroutineContext. So, the basic recipe here is to inspect the last argument of the method being invoked. If it’s null or not a Continuation, then this is a regular function we can invoke just as we did in the naive example above. However, if we find a Continuation as the last arg, we need to do the following:

  1. Wrap this underlying Continuation and override the resumeWith method.
  2. If our wrapper Continuation is resumed with an Exception, log it and resume the underlying Continuation with our custom WrappedTestException. Otherwise, resume the underlying with the regular result.
  3. Next, we need to perform the execution of this method inside of a CoroutineScope. Don’t be tempted to launch this invocation in the GlobalScope. Doing so will break the structured concurrency model in Kotlin and invocations may not get cleaned up or canceled as otherwise expected. Fortunately, we can properly build a CoroutineScope using the context found in our underlying Continuation.

We can also compose this context with our own requirements, such as launching the invocation to run in the IO thread pool via Dispatchers. When invoking the method, we now just need to change the last argument supplied with our new and improved Continuation.

Now that we have launched our method invocation in the IO thread pool, what do we return from our dynamically invoked method? Kotlin has a magic return value for suspend functions that tells the runtime to suspend execution and allow the current thread to move on to perform other work. It is kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED

Here’s what the whole thing looks like when it’s all put together:

/**
 * File: main.kt
 *
 * Dynamic proxy that correctly catches and prints out exceptions with proper handling for coroutines.
 * Rethrows caught exceptions as WrappedTestException
 */
class CorrectExceptionLogger<T>(private val instance: T): InvocationHandler {
    override fun invoke(proxy: Any?, method: Method?, args: Array<out Any>?): Any? {
        val nonNullArgs = args ?: arrayOf()
        try {
            val lastArg = nonNullArgs.lastOrNull()
            if(lastArg == null || lastArg !is Continuation<*>) {
                // not a suspend func, just invoke regularly
                return method?.invoke(instance, *nonNullArgs)
            } else {
                // Step 1: Wrap the underlying continuation to intercept exceptions.
                @Suppress("UNCHECKED_CAST")
                val originalContinuation = lastArg as Continuation<Any?>
                val wrappedContinuation = object: Continuation<Any?> {
                    override val context: CoroutineContext get() = originalContinuation.context
                    override fun resumeWith(result: Result<Any?>) {
                        result.exceptionOrNull()?.let{ err ->
                            // Step 2: log intercepted exception and resume with our custom wrapped exception.
                            println("Correctly caught underlying coroutine exception $err")
                            originalContinuation.resumeWithException(WrappedTestException(err))
                        } ?: originalContinuation.resumeWith(result)
                    }
                }
                // Step 3: launch the suspend function with our wrapped continuation using the underlying scope and context, but force it to run in the IO thread pool
                CoroutineScope(originalContinuation.context).launch(Dispatchers.IO + originalContinuation.context) {
                    val argumentsWithoutContinuation = nonNullArgs.take(nonNullArgs.size - 1)
                    val newArgs = argumentsWithoutContinuation + wrappedContinuation
                    method?.invoke(instance, *newArgs.toTypedArray())
                }
                return kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
            }
        } catch(e: InvocationTargetException) {
            e.targetException?.let{ targetException ->
                println("Correctly caught underlying exception $targetException")
                throw WrappedTestException(targetException)
            } ?: throw WrappedTestException(e)
        }
    }
}

Run It!

Okay, if you’re still here, let’s add this new implementation to our TestFactory and run it.

object TestFactory {
    fun createNaively(): ITest {
        return Proxy.newProxyInstance(
            ITest::class.java.classLoader,
            arrayOf<Class<*>>(ITest::class.java),
            NaiveExceptionLogger(TestImpl())) as ITest
    }

    fun createCorrectly(): ITest {
        return Proxy.newProxyInstance(
            ITest::class.java.classLoader,
            arrayOf<Class<*>>(ITest::class.java),
            CorrectExceptionLogger(TestImpl())) as ITest
    }
}


fun main(args: Array<String>) {
    runBlocking {
        val test = TestFactory.createCorrectly()
        println(test.test())
        println(test.testSuspend())
        try {
            test.testException()
            throw IllegalStateException("Did not catch testException()")
        } catch(e: WrappedTestException) { }
        try {
            test.testExceptionSuspend()
            throw IllegalStateException("Did not catch testExceptionSuspend()")
        } catch(e: WrappedTestException) { }
    }
}

Your output should look like as seen below. If so – congratulations! – you now have created a generic dynamic proxy that will work with both regular and suspend functions.

test result
testSuspend result
Correctly caught underlying exception java.lang.Exception: You called testException().
Correctly caught underlying coroutine exception java.lang.Exception: You called testExceptionSuspend().

Process finished with exit code 0