Created
May 5, 2020 16:59
-
-
Save ZakTaccardi/64e7e70fb75ea5cba48a3797d75e6fe4 to your computer and use it in GitHub Desktop.
StateActor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kotlinx.coroutines.CompletionHandler | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.CoroutineStart | |
import kotlinx.coroutines.channels.SendChannel | |
import kotlinx.coroutines.channels.actor | |
import kotlinx.coroutines.flow.Flow | |
import kotlin.coroutines.CoroutineContext | |
import kotlin.coroutines.EmptyCoroutineContext | |
interface StateActor<S, I> : SendChannel<I> { | |
/** | |
* Obverse the state exposed by this actor as it changes over time. While a value will always be emitted upon | |
* subscription, *when* it emits is controlled by [FirstEmission] | |
* | |
* To update this state, use: | |
* - [offer] | |
* - [send] | |
*/ | |
val states: Flow<S> | |
/** | |
* Thread safe access to the current state ([S]). If there are pending intentions ([I]) when this function is | |
* called, emissions will be delayed until those intentions are processed. | |
* | |
* To update this state, use: | |
* - [offer] | |
* - [send] | |
*/ | |
suspend fun currentState(): S | |
/** | |
* Controls how the `Flow<T>` from `[StateActor.states] will emit it's first emission. | |
*/ | |
enum class FirstEmission { | |
/** | |
* The [StateActor.states] will emit its current value immediately upon subscription, even if that current value | |
* is stale due to pending intentions that have been submitted but not yet processed. | |
*/ | |
IMMEDIATE, | |
/** | |
* When [StateActor.states] is subscribed to, it will wait for any pending intentions to be processed before its | |
* first value is emitted. At this moment, the first value will be the latest current state with those intentions | |
* accounted for. | |
*/ | |
SAFE | |
} | |
} | |
/** | |
* Return a new state [S] based on the `currentState` and a new `intention` [I]. | |
*/ | |
typealias StateActorReducer<S, I> = suspend CoroutineScope.(currentState: S, intention: I) -> S | |
/** | |
* Construct a new [StateActor] | |
* - overload that provides a [initialState] immediately | |
* | |
* @param firstEmission see [FirstEmission] | |
* @param reduce used to update the state | |
* | |
* @see [actor] | |
* @param context - see [actor] | |
* @param capacity - see [actor] | |
* @param start - see [actor] | |
* @param onCompletion - see [actor] | |
*/ | |
fun <S, I> CoroutineScope.stateActor( | |
firstEmission: FirstEmission, | |
initialState: S, | |
logger: LoggerForClass = noOpLogger, | |
context: CoroutineContext = EmptyCoroutineContext, | |
capacity: Int = 0, | |
start: CoroutineStart = CoroutineStart.DEFAULT, | |
onCompletion: CompletionHandler? = null, | |
reduce: StateActorReducer<S, I> | |
): StateActor<S, I> = StateActorImpl( | |
scope = this, | |
firstEmission = firstEmission, | |
initialStateProvider = InitialStateProvider.AlreadyProvided(initialState), | |
logger = logger.first, | |
kClass = logger.second, | |
context = context, | |
capacity = capacity, | |
start = start, | |
onCompletion = onCompletion, | |
reduce = reduce | |
) | |
/** | |
* Construct a new [StateActor] | |
* - overload that provides an initial when the actor is first started | |
* | |
* @param firstEmission see [FirstEmission] | |
* | |
* @param reduce used to update the state | |
* | |
* @see [actor] | |
* @param context - see [actor] | |
* @param capacity - see [actor] | |
* @param start - see [actor] | |
* @param onCompletion - see [actor] | |
*/ | |
fun <S, I> CoroutineScope.stateActor( | |
firstEmission: FirstEmission, | |
provideInitialState: suspend () -> S, | |
logger: LoggerForClass = noOpLogger, | |
context: CoroutineContext = EmptyCoroutineContext, | |
capacity: Int = 0, | |
start: CoroutineStart = CoroutineStart.DEFAULT, | |
onCompletion: CompletionHandler? = null, | |
reduce: StateActorReducer<S, I> | |
): StateActor<S, I> = StateActorImpl( | |
scope = this, | |
firstEmission = firstEmission, | |
initialStateProvider = InitialStateProvider.SuspendingProvider(provideInitialState), | |
logger = logger.first, | |
kClass = logger.second, | |
context = context, | |
capacity = capacity, | |
start = start, | |
onCompletion = onCompletion, | |
reduce = reduce | |
) | |
import kotlinx.coroutines.CompletableDeferred | |
import kotlinx.coroutines.CompletionHandler | |
import kotlinx.coroutines.CoroutineScope | |
import kotlinx.coroutines.CoroutineStart | |
import kotlinx.coroutines.ExperimentalCoroutinesApi | |
import kotlinx.coroutines.channels.ConflatedBroadcastChannel | |
import kotlinx.coroutines.channels.SendChannel | |
import kotlinx.coroutines.channels.actor | |
import kotlinx.coroutines.flow.Flow | |
import kotlinx.coroutines.flow.asFlow | |
import kotlinx.coroutines.flow.emitAll | |
import kotlinx.coroutines.flow.flow | |
import kotlinx.coroutines.selects.SelectClause2 | |
import kotlin.coroutines.CoroutineContext | |
import kotlin.reflect.KClass | |
/** | |
* Implementation of [StateActor] | |
*/ | |
internal class StateActorImpl<S, I>( | |
scope: CoroutineScope, | |
firstEmission: StateActor.FirstEmission, | |
initialStateProvider: InitialStateProvider<S>, | |
logger: IdentityLogger, | |
kClass: KClass<*>, | |
context: CoroutineContext, | |
capacity: Int, | |
start: CoroutineStart, | |
onCompletion: CompletionHandler?, | |
reduce: StateActorReducer<S, I> | |
) : StateActor<S, I> { | |
@ExperimentalCoroutinesApi | |
private val stateChannel = when (initialStateProvider) { | |
is InitialStateProvider.AlreadyProvided -> { | |
val initialState = initialStateProvider.state | |
logger.asStateLogger(kClass) | |
.log(initialState) | |
ConflatedBroadcastChannel(initialStateProvider.state) | |
} | |
is InitialStateProvider.SuspendingProvider -> { | |
// initial state will be provided late when actor is initialized | |
ConflatedBroadcastChannel() | |
} | |
} | |
private val actor = scope.actor<StateActorIntention<S, I>>( | |
context = context, | |
capacity = capacity, | |
start = start, | |
onCompletion = onCompletion | |
) { | |
val stateLogger = logger.asStateLogger(kClass) | |
val intentionLogger = logger.asIntentionLogger(kClass) | |
val getLogger = logger.asLabelLogger(kClass, "Get") | |
val stateChannelFlow = stateChannel.asFlow() | |
var currentState: S = when (initialStateProvider) { | |
is InitialStateProvider.AlreadyProvided -> stateChannel.value | |
is InitialStateProvider.SuspendingProvider -> { | |
val initialState = initialStateProvider.provide() | |
stateChannel.send(initialState) | |
stateLogger.log(initialState) | |
initialState | |
} | |
} | |
for (intention in channel) { | |
when (intention) { | |
is Actual -> { | |
intentionLogger.log(intention.intention) | |
currentState = reduce( | |
this@actor, | |
currentState, | |
intention.intention | |
) | |
if (stateChannel.value === currentState) { | |
// no need to update the channel, new state is the same instance as the old state | |
} else { | |
stateChannel.send(currentState) | |
} | |
} | |
is StateActorIntention.Get -> { | |
getLogger.log(currentState) | |
intention.deferred.complete(currentState) | |
} | |
is StateActorIntention.Observe -> { | |
intention.deferred.complete(stateChannelFlow) | |
currentState | |
} | |
} | |
if (intention is StateActorIntention.Observe) { | |
intention.deferred.complete(stateChannelFlow) | |
} | |
} | |
} | |
override val states: Flow<S> = when (firstEmission) { | |
IMMEDIATE -> stateChannel.asFlow() | |
SAFE -> flow { | |
val intention = StateActorIntention.Observe<S, I>() | |
actor.send(intention) | |
emitAll(intention.deferred.await()) | |
} | |
} | |
override suspend fun currentState(): S { | |
val intention = StateActorIntention.Get<S, I>() | |
actor.send(intention) | |
return intention.deferred.await() | |
} | |
private val _onSend: SelectClause2<StateActorIntention<S, I>, SendChannel<StateActorIntention<S, I>>> = actor.onSend | |
@ExperimentalCoroutinesApi | |
override val isClosedForSend: Boolean | |
get() = actor.isClosedForSend | |
override fun close(cause: Throwable?): Boolean = actor.close(cause) | |
@ExperimentalCoroutinesApi | |
override fun invokeOnClose(handler: (cause: Throwable?) -> Unit) = actor.invokeOnClose(handler) | |
override fun offer(element: I): Boolean = actor.offer(StateActorIntention.Actual(element)) | |
override suspend fun send(element: I) = actor.send(StateActorIntention.Actual(element)) | |
@ExperimentalCoroutinesApi | |
override val isFull: Boolean get() = throw NotImplementedError("Deprecated") | |
} | |
internal sealed class InitialStateProvider<S> { | |
data class AlreadyProvided<S>(val state: S) : InitialStateProvider<S>() | |
data class SuspendingProvider<S>(val provide: suspend () -> S) : InitialStateProvider<S>() | |
} | |
/** | |
* An internal wrapper around [Actual] that allows use to support [StateActor] features | |
*/ | |
private sealed class StateActorIntention<S, I> { | |
data class Actual<S, I>(val intention: I) : StateActorIntention<S, I>() | |
/** | |
* Supports [StateActor.currentState] | |
*/ | |
data class Get<S, I>(val deferred: CompletableDeferred<S> = CompletableDeferred()) : StateActorIntention<S, I>() | |
/** | |
* Supports [StateActor.states] and [StateActor.FlowFirstEmissionBehavior] | |
*/ | |
data class Observe<S, I>(val deferred: CompletableDeferred<Flow<S>> = CompletableDeferred()) : StateActorIntention<S, I>() | |
} | |
internal val noOpLogger: LoggerForClass = NoOpIdentityLogger().forClass(Nothing::class) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment