Skip to content

Instantly share code, notes, and snippets.

@defHLT
Created September 5, 2017 20:15

Revisions

  1. defHLT created this gist Sep 5, 2017.
    123 changes: 123 additions & 0 deletions rx-coroutines.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,123 @@
    import hu.akarnokd.rxjava2.schedulers.BlockingScheduler
    import io.reactivex.Single
    import io.reactivex.functions.BiFunction
    import io.reactivex.plugins.RxJavaPlugins
    import io.reactivex.schedulers.Schedulers
    import kotlinx.coroutines.experimental.CommonPool
    import kotlinx.coroutines.experimental.async
    import kotlinx.coroutines.experimental.launch
    import kotlinx.coroutines.experimental.runBlocking
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeoutException

    fun main(args: Array<String>) {
    runBlocking { coroutineWay() }
    // reactiveWay()
    }

    suspend fun f1(i: Int): Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 1;
    }

    suspend fun f2(i: Int): Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 2;
    }

    suspend fun coroutineWay() {
    val t0 = System.currentTimeMillis()

    var i = 0;
    while (true) { // (1)
    println("Attempt " + (i + 1) + " at T=" +
    (System.currentTimeMillis() - t0))

    var v1 = async(CommonPool) { f1(i) } // (2)
    var v2 = async(CommonPool) { f2(i) }

    var v3 = launch(CommonPool) { // (3)
    Thread.sleep(500)
    println(" Cancelling at T=" +
    (System.currentTimeMillis() - t0))
    val te = TimeoutException();
    v1.cancel(te); // (4)
    v2.cancel(te);
    }

    try {
    val r1 = v1.await(); // (5)
    val r2 = v2.await();
    v3.cancel(); // (6)
    println(r1 + r2)
    break;
    } catch (ex: TimeoutException) { // (7)
    println(" Crash at T=" +
    (System.currentTimeMillis() - t0))
    if (++i > 2) { // (8)
    throw ex;
    }
    }
    }
    println("End at T="
    + (System.currentTimeMillis() - t0)) // (9)

    }

    fun f3(i: Int) : Int {
    println("str $i")
    try {
    Thread.sleep(if (i != 2) 2000L else 200L)
    } catch (e: Exception) {
    println("caught $e")
    throw e
    }
    println("end $i")
    return 1
    }

    fun f4(i: Int) : Int {
    Thread.sleep(if (i != 2) 2000L else 200L)
    return 2
    }

    fun reactiveWay() {
    RxJavaPlugins.setErrorHandler({ }) // (1)

    val sched = BlockingScheduler() // (2)
    sched.execute {
    val t0 = System.currentTimeMillis()
    val count = Array<Int>(1, { 0 }) // (3)

    Single.defer({ // (4)
    val c = count[0]++;
    println("Attempt " + (c + 1) +
    " at T=" + (System.currentTimeMillis() - t0))

    Single.zip( // (5)
    Single.fromCallable({ f3(c) })
    .subscribeOn(Schedulers.io()),
    Single.fromCallable({ f4(c) })
    .subscribeOn(Schedulers.io()),
    BiFunction<Int, Int, Int> { a, b -> a + b } // (6)
    )
    })
    .doOnDispose({ // (7)
    println(" Cancelling at T=" +
    (System.currentTimeMillis() - t0))
    })
    .timeout(500, TimeUnit.MILLISECONDS) // (8)
    .retry({ x, e ->
    println(" Crash at " +
    (System.currentTimeMillis() - t0))
    x < 3 && e is TimeoutException // (9)
    })
    .doAfterTerminate { sched.shutdown() } // (10)
    .subscribe({
    println(it)
    println("End at T=" +
    (System.currentTimeMillis() - t0)) // (11)
    },
    { it.printStackTrace() })
    }
    }