Skip to content

Instantly share code, notes, and snippets.

@frapontillo
Last active May 23, 2016 17:43
Show Gist options
  • Save frapontillo/81e65683156519a3738561fd712c246a to your computer and use it in GitHub Desktop.
Save frapontillo/81e65683156519a3738561fd712c246a to your computer and use it in GitHub Desktop.
RxJava retryWhen with dynamic delay
package com.novoda.github.reports;
import java.sql.Date;
import java.time.Instant;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Subscription;
import rx.subjects.PublishSubject;
public class RxTest {
// this will come date Retrofit in some sort of Response Exception where we can read data date
private class RetryException extends Exception {
private final int seconds;
private final java.util.Date date;
RetryException(int seconds, java.util.Date from) {
super();
this.seconds = seconds;
this.date = from;
}
int getSeconds() {
return seconds;
}
public java.util.Date getDate() {
return date;
}
}
private class ResettableTimerSubject {
private PublishSubject<Object> timeSubject;
private Subscription timer;
ResettableTimerSubject() {
timeSubject = PublishSubject.create();
}
synchronized void onNext(String what, Integer seconds) {
if (timer != null && !timer.isUnsubscribed()) {
timer.unsubscribe();
}
timer = Observable.timer(seconds, TimeUnit.SECONDS).subscribe(time -> {
System.out.println(String.format("Waited %s for %d seconds", what, seconds));
timeSubject.onNext(seconds);
});
}
}
private ResettableTimerSubject resetTokenSubject;
@Before
public void setup() {
resetTokenSubject = new ResettableTimerSubject();
}
@Test
public void withMockRepo_whenHittingApiLimit_thenRetryWhenSignalled() {
Observable.create(subscriber -> {
subscriber.onNext(1);
subscriber.onError(new RetryException(10, Date.from(Instant.now())));
}).retryWhen(errors -> errors.flatMap(error -> {
if (error instanceof RetryException) {
System.err.println(String.format("RetryException %s", ((RetryException) error).getDate()));
resetTokenSubject.onNext("first batch", ((RetryException) error).getSeconds());
return resetTokenSubject.timeSubject;
}
return Observable.error(error);
})).subscribe(System.out::println);
Observable.create(subscriber -> {
subscriber.onNext(2);
subscriber.onError(new RetryException(3, Date.from(Instant.now())));
}).retryWhen(errors -> errors.flatMap(error -> {
if (error instanceof RetryException) {
System.err.println(String.format("RetryException %s", ((RetryException) error).getDate()));
resetTokenSubject.onNext("second batch", ((RetryException) error).getSeconds());
return resetTokenSubject.timeSubject;
}
return Observable.error(error);
})).toBlocking().subscribe(System.out::println);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment