Skip to content

Instantly share code, notes, and snippets.

@danielt1263
Last active February 20, 2024 13:41
Show Gist options
  • Select an option

  • Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.

Select an option

Save danielt1263/820bd8091048aa138c82151966f7c66d to your computer and use it in GitHub Desktop.
The pollContinuously operator is useful if you want to subscribe to an Observable again after it finishes...
//
// PollContinuously+Rx.swift
//
// Created by Daniel Tartaglia on 20 Jun 2023.
// Copyright © 2023 Daniel Tartaglia. MIT License.
//
import RxSwift
extension ObservableType {
func pollContinuously(delay: RxTimeInterval, scheduler: SchedulerType) -> Observable<Element> {
.create { observer in
let subject = PublishSubject<Element>()
let output = subject
.subscribe(observer)
let loop = subject
.concatMap { _ in
Observable.just(())
.delay(delay, scheduler: scheduler)
.flatMap { self }
}
.observe(on: scheduler)
.subscribe(subject)
let input = Observable.concat(self.asObservable(), .never())
.subscribe(subject)
return CompositeDisposable(output, loop, input)
}
}
}
class PollContinuouslyTests: XCTestCase {
func test1() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|")
let expected = parseEventsAndTimes(timeline: "-A--B--C--D|", values: { String($0) })
.offsetTime(by: 200)
let result = scheduler.start {
source.pollContinuously(delay: .seconds(1), scheduler: scheduler)
.take(4)
}
XCTAssertEqual(result.events, expected[0])
}
func test2() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A|-B|-C|-D|")
let expected = parseEventsAndTimes(timeline: "-A---B---C---D|", values: { String($0) })
.offsetTime(by: 200)
let result = scheduler.start {
source.pollContinuously(delay: .seconds(2), scheduler: scheduler)
.take(4)
}
XCTAssertEqual(result.events, expected[0])
}
func test2_2() {
let scheduler = TestScheduler(initialClock: 0)
let source = scheduler.createObservable(timeline: "-A-B|-C-D|-E-F|-G-H|")
let expected = parseEventsAndTimes(timeline: "-A-B-C-D--E-F--G-H|", values: { String($0) })
.offsetTime(by: 200)
let result = scheduler.start {
source.pollContinuously(delay: .seconds(2), scheduler: scheduler)
.take(8)
}
XCTAssertEqual(result.events, expected[0])
}
}
@giofid

giofid commented Feb 20, 2024

Copy link
Copy Markdown

Hello @danielt1263,
I noted a Reentrancy anomaly was detected warning when an error occurs during polling. I created a snippet of code to reproduce the problem.

struct MyError: Error { }

func polling() -> Observable<Int> {
    observable().pollContinuously(delay: .seconds(1), scheduler: MainScheduler.instance).take(6)
}

func observable() -> Observable<Int> {
    Observable<Int>.create { observer in
        observer.onError(MyError())
        return Disposables.create {}
    }
}

....

polling()
    .observe(on: MainScheduler.instance)
    .subscribe { value in
        print(value)
    } onError: { error in
        print(error)
    } onDisposed: {
        print("Disposed")
    }
    .disposed(by: disposeBag)

@danielt1263

Copy link
Copy Markdown
Author

@giofid That is expected behavior with any feedback loop such as this, and is a case where you should be using MainScheduler.asyncInstance instead of .instance.

If you want, you can force the issue like RxFeedback does. See the property var async on line 141 for more information.

@giofid

giofid commented Feb 20, 2024

Copy link
Copy Markdown

Thank you @danielt1263 for your feedback.

Sorry, I didn't understand your last observation.

If you want, you can force the issue like RxFeedback does. See the property var async on line 141 for more information.

Can you please explain that better?

@danielt1263

Copy link
Copy Markdown
Author

It's fully explained on line 141 of the link. I can't do better than it does.

@giofid

giofid commented Feb 20, 2024

Copy link
Copy Markdown

I realized what you meant, I think.

Using let asyncScheduler = scheduler.async instead of scheduler inside pollContinuously.

Thanks as always Daniel!

@danielt1263

Copy link
Copy Markdown
Author

Yes, exactly. Either that or just pass MainScheduler.asyncInstance to pollContinuously in the first place.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment