Last active
May 14, 2017 17:03
-
-
Save evant/7976ed19db6c31ac2a9c814ff8b12d3a to your computer and use it in GitHub Desktop.
A single cache that allows evicting the value or error. Subscriptions after the value is evicted will re-trigger the upstream observer.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (c) 2016-present, RxJava Contributors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in | |
* compliance with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software distributed under the License is | |
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See | |
* the License for the specific language governing permissions and limitations under the License. | |
*/ | |
package me.tatarka.rxcache2; | |
import io.reactivex.*; | |
import io.reactivex.disposables.Disposable; | |
import io.reactivex.exceptions.Exceptions; | |
import io.reactivex.functions.BiFunction; | |
import io.reactivex.functions.Function; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicReference; | |
/** | |
* A single cache that allows evicting the value or error. Subscriptions after the value is evicted will re-trigger the | |
* upstream observer. | |
*/ | |
public class EvictableSingleCache<T> extends Single<T> implements SingleObserver<T>, CompletableObserver { | |
@SuppressWarnings("rawtypes") | |
static final CacheDisposable[] EMPTY = new CacheDisposable[0]; | |
@SuppressWarnings("rawtypes") | |
static final CacheDisposable[] TERMINATED = new CacheDisposable[0]; | |
final SingleSource<? extends T> source; | |
final BiFunction<? super T, Throwable, Completable> evictionFunction; | |
final AtomicInteger wip; | |
final AtomicReference<CacheDisposable<T>[]> observers; | |
T value; | |
Throwable error; | |
/** | |
* Constructs a new cached observable that is never evicted. This is equivalent to {@link Single#cache()} | |
*/ | |
public EvictableSingleCache(Single<T> source) { | |
this(source, Completable.never()); | |
} | |
/** | |
* Constructs a new cached observable that's evicted when the given completable completes. The eviction completable | |
* will be subscribed to when the source emits a value or error. | |
*/ | |
public EvictableSingleCache(Single<T> source, final Completable eviction) { | |
this(source, new BiFunction<T, Throwable, Completable>() { | |
@Override | |
public Completable apply(T t, Throwable throwable) throws Exception { | |
return eviction; | |
} | |
}); | |
} | |
/** | |
* Constructs a new cached observable that's evicted when the given completable completes. The success completable | |
* will be subscribed to if the source emits a value, and the error completable will be subscribed to if the source | |
* emits an error. | |
*/ | |
public EvictableSingleCache(Single<T> source, final Completable successEviction, final Completable errorEviction) { | |
this(source, new BiFunction<T, Throwable, Completable>() { | |
@Override | |
public Completable apply(T value, Throwable error) throws Exception { | |
return value != null ? successEviction : errorEviction; | |
} | |
}); | |
} | |
/** | |
* Constructs a new cached observable that's evicted when the given completable completes. The success function | |
* will be invoked if the source emits a value, and the error function will be invoked if the source emits an | |
* error. | |
*/ | |
public EvictableSingleCache(Single<T> source, final Function<? super T, Completable> successEvictionFunction, final Function<Throwable, Completable> errorEvictionFunction) { | |
this(source, new BiFunction<T, Throwable, Completable>() { | |
@Override | |
public Completable apply(T value, Throwable error) throws Exception { | |
return (value != null ? successEvictionFunction.apply(value) : errorEvictionFunction.apply(error)); | |
} | |
}); | |
} | |
/** | |
* Constructs a new cached observable that's evicted when the given completable completes. The function | |
* will be invoked when the source emits a value or error. | |
*/ | |
public EvictableSingleCache(Single<T> source, BiFunction<? super T, Throwable, Completable> evictionFunction) { | |
this.source = source; | |
this.wip = new AtomicInteger(); | |
this.evictionFunction = evictionFunction; | |
this.observers = new AtomicReference<CacheDisposable<T>[]>(EMPTY); | |
} | |
@Override | |
protected void subscribeActual(SingleObserver<? super T> s) { | |
CacheDisposable<T> d = new CacheDisposable<>(s, this); | |
s.onSubscribe(d); | |
if (add(d)) { | |
if (d.isDisposed()) { | |
remove(d); | |
} | |
} else { | |
Throwable ex = error; | |
if (ex != null) { | |
s.onError(ex); | |
} else { | |
s.onSuccess(value); | |
} | |
return; | |
} | |
if (wip.getAndIncrement() == 0) { | |
source.subscribe(this); | |
} | |
} | |
boolean add(CacheDisposable<T> observer) { | |
for (; ; ) { | |
CacheDisposable<T>[] a = observers.get(); | |
if (a == TERMINATED) { | |
return false; | |
} | |
int n = a.length; | |
@SuppressWarnings("unchecked") | |
CacheDisposable<T>[] b = new CacheDisposable[n + 1]; | |
System.arraycopy(a, 0, b, 0, n); | |
b[n] = observer; | |
if (observers.compareAndSet(a, b)) { | |
return true; | |
} | |
} | |
} | |
@SuppressWarnings("unchecked") | |
void remove(CacheDisposable<T> observer) { | |
for (; ; ) { | |
CacheDisposable<T>[] a = observers.get(); | |
int n = a.length; | |
if (n == 0) { | |
return; | |
} | |
int j = -1; | |
for (int i = 0; i < n; i++) { | |
if (a[i] == observer) { | |
j = i; | |
break; | |
} | |
} | |
if (j < 0) { | |
return; | |
} | |
CacheDisposable<T>[] b; | |
if (n == 1) { | |
b = EMPTY; | |
} else { | |
b = new CacheDisposable[n - 1]; | |
System.arraycopy(a, 0, b, 0, j); | |
System.arraycopy(a, j + 1, b, j, n - j - 1); | |
} | |
if (observers.compareAndSet(a, b)) { | |
return; | |
} | |
} | |
} | |
@Override | |
public void onSubscribe(Disposable d) { | |
// not supported by this operator | |
} | |
@Override | |
public void onSuccess(T value) { | |
this.value = value; | |
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) { | |
if (!d.isDisposed()) { | |
d.actual.onSuccess(value); | |
} | |
} | |
try { | |
Completable eviction = evictionFunction.apply(value, null); | |
if (eviction != Completable.never()) { | |
eviction.subscribe(this); | |
} | |
} catch (Exception ex) { | |
throw Exceptions.propagate(ex); | |
} | |
} | |
@Override | |
public void onError(Throwable e) { | |
this.error = e; | |
for (CacheDisposable<T> d : observers.getAndSet(TERMINATED)) { | |
if (!d.isDisposed()) { | |
d.actual.onError(e); | |
} | |
} | |
try { | |
Completable eviction = evictionFunction.apply(null, e); | |
if (eviction != Completable.never()) { | |
eviction.subscribe(this); | |
} | |
} catch (Exception ex) { | |
throw Exceptions.propagate(ex); | |
} | |
} | |
@Override | |
public void onComplete() { | |
observers.set(EMPTY); | |
wip.set(0); | |
} | |
static final class CacheDisposable<T> | |
extends AtomicBoolean | |
implements Disposable { | |
private static final long serialVersionUID = 7514387411091976596L; | |
final SingleObserver<? super T> actual; | |
final EvictableSingleCache<T> parent; | |
CacheDisposable(SingleObserver<? super T> actual, EvictableSingleCache<T> parent) { | |
this.actual = actual; | |
this.parent = parent; | |
} | |
@Override | |
public boolean isDisposed() { | |
return get(); | |
} | |
@Override | |
public void dispose() { | |
if (compareAndSet(false, true)) { | |
parent.remove(this); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Examples:
Cache value for 10 minutes.
Don't cache errors.
Clear cache on button click.