Last active
January 30, 2022 12:01
-
-
Save jhalterman/3c6221f8fefd4fa97ce4623a614fad6b to your computer and use it in GitHub Desktop.
A cancellable Scala Future
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.jodah.cancellablefuture | |
import java.util.concurrent.atomic.AtomicBoolean | |
import scala.concurrent._ | |
import scala.concurrent.duration.Duration | |
import scala.util.{Failure, Success, Try} | |
object CancellableFuture { | |
def apply[T](body: => T)(implicit executor: ExecutionContext): CancellableFuture[T] = new CancellableFuture(body) | |
} | |
/** | |
* A future that supports cancellation and interruption, even when backed by a ForkJoinPool. | |
*/ | |
class CancellableFuture[T](body: => T)(implicit executor: ExecutionContext) extends Future[T] { | |
private val promise = Promise[T]() | |
private var thread: Thread = null | |
private val cancelled = new AtomicBoolean() | |
promise tryCompleteWith Future { | |
if (!promise.isCompleted) { | |
this.synchronized { | |
thread = Thread.currentThread | |
} | |
try { | |
body | |
} finally { | |
this.synchronized { | |
// Clears the interrupt flag | |
Thread.interrupted() | |
thread = null | |
} | |
} | |
} else | |
null.asInstanceOf[T] | |
} | |
/** | |
* Attempts to cancel the future. Cancellation succeeds if the future is not yet complete or cancelled. A cancelled | |
* future will have a result of Failure(CancellationException). | |
* | |
* @param interrupt Whether to interrupt the running Future | |
* @return True if the execution was cancelled | |
*/ | |
def cancel(interrupt: Boolean): Boolean = { | |
if (!promise.isCompleted && cancelled.compareAndSet(false, true)) { | |
promise.tryComplete(Failure(new CancellationException())) | |
if (interrupt) { | |
this.synchronized { | |
if (thread != null) | |
thread.interrupt() | |
} | |
} | |
true | |
} else | |
false | |
} | |
/** | |
* Returns whether the future was cancelled. | |
*/ | |
def isCancelled: Boolean = cancelled.get() | |
override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = promise.future.onComplete(f) | |
override def isCompleted: Boolean = promise.future.isCompleted | |
override def value: Option[Try[T]] = promise.future.value | |
@throws(classOf[TimeoutException]) | |
@throws(classOf[InterruptedException]) | |
def ready(atMost: Duration)(implicit permit: CanAwait): this.type = CancellableFuture[T](Await.result(promise.future, atMost)).asInstanceOf | |
@throws(classOf[Exception]) | |
def result(atMost: Duration)(implicit permit: CanAwait): T = promise.future.result(atMost) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.jodah.cancellablefuture | |
import java.util.concurrent.Executors | |
import java.util.concurrent.atomic.AtomicBoolean | |
import org.scalatest.concurrent.{ScalaFutures, Waiters} | |
import org.scalatest.{FunSpec, Matchers} | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.concurrent.{CancellationException, ExecutionContext} | |
class CancellableFutureSpec extends FunSpec with Matchers with Waiters with ScalaFutures { | |
describe("CancelleableFuture") { | |
it("should complete with result") { | |
val f = CancellableFuture("test") | |
f.futureValue shouldBe "test" | |
f.isCompleted shouldBe true | |
f.isCancelled shouldBe false | |
} | |
it("should be cancellable") { | |
val f = CancellableFuture(() -> { | |
Thread.sleep(1000) | |
}) | |
f.cancel(false) shouldBe true | |
f.failed.futureValue.isInstanceOf[CancellationException] | |
f.isCompleted shouldBe true | |
f.isCancelled shouldBe true | |
} | |
it("should immediately cancel pending task") { | |
val ec = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor()) | |
// Fully utilize the ExecutionContext | |
val w = new Waiter() | |
CancellableFuture(() -> { | |
w.await() | |
})(ec) | |
var executed = new AtomicBoolean() | |
val f = CancellableFuture(() -> { | |
executed.set(true) | |
})(ec) | |
f.cancel(false) shouldBe true | |
w.dismiss() | |
Thread.sleep(500) | |
executed.get() shouldBe false | |
f.failed.futureValue.isInstanceOf[CancellationException] | |
f.isCompleted shouldBe true | |
f.isCancelled shouldBe true | |
} | |
it("should not support repeated cancellation") { | |
val f = CancellableFuture(() -> { | |
Thread.sleep(1000) | |
}) | |
f.cancel(false) shouldBe true | |
f.cancel(false) shouldBe false | |
} | |
it("should be interruptable") { | |
val waiter = new Waiter() | |
val f = CancellableFuture(() -> { | |
try { | |
Thread.sleep(1000) | |
} catch { | |
case _: InterruptedException => waiter.dismiss() | |
} | |
}) | |
Thread.sleep(100) | |
f.cancel(true) shouldBe true | |
f.failed.futureValue.isInstanceOf[CancellationException] | |
waiter.await() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Currently, cancel returns true if the task is already running. The body will still continue to run, but the Future will be completed with CancellationException.