package net.jodah.cancellablefuture import java.util.concurrent.atomic.AtomicBoolean import scala.concurrent._ import scala.concurrent.duration.Duration import scala.util.{Failure, Success, Try} object CancellableFuture { def apply[T](body: => T)(implicit executor: ExecutionContext): CancellableFuture[T] = new CancellableFuture(body) } /** * A future that supports cancellation and interruption, even when backed by a ForkJoinPool. */ class CancellableFuture[T](body: => T)(implicit executor: ExecutionContext) extends Future[T] { private val promise = Promise[T]() private var thread: Thread = null private val cancelled = new AtomicBoolean() promise tryCompleteWith Future { if (!promise.isCompleted) { this.synchronized { thread = Thread.currentThread } try { body } finally { this.synchronized { // Clears the interrupt flag Thread.interrupted() thread = null } } } else null.asInstanceOf[T] } /** * Attempts to cancel the future. Cancellation succeeds if the future is not yet complete or cancelled. A cancelled * future will have a result of Failure(CancellationException). * * @param interrupt Whether to interrupt the running Future * @return True if the execution was cancelled */ def cancel(interrupt: Boolean): Boolean = { if (!promise.isCompleted && cancelled.compareAndSet(false, true)) { promise.tryComplete(Failure(new CancellationException())) if (interrupt) { this.synchronized { if (thread != null) thread.interrupt() } } true } else false } /** * Returns whether the future was cancelled. */ def isCancelled: Boolean = cancelled.get() override def onComplete[U](f: Try[T] => U)(implicit executor: ExecutionContext): Unit = promise.future.onComplete(f) override def isCompleted: Boolean = promise.future.isCompleted override def value: Option[Try[T]] = promise.future.value @throws(classOf[TimeoutException]) @throws(classOf[InterruptedException]) def ready(atMost: Duration)(implicit permit: CanAwait): this.type = CancellableFuture[T](Await.result(promise.future, atMost)).asInstanceOf @throws(classOf[Exception]) def result(atMost: Duration)(implicit permit: CanAwait): T = promise.future.result(atMost) }