Last active
August 29, 2015 14:15
Revisions
-
fpillet revised this gist
Feb 20, 2015 . 1 changed file with 2 additions and 1 deletion.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -65,6 +65,7 @@ - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACSchedul void (^sendLatest)() = ^{ @synchronized (timerDisposable) { [timerDisposable.disposable dispose]; timerDisposable.disposable = nil; if (latestValue) { [subscriber sendNext:latestValue]; latestValue = nil; @@ -74,7 +75,7 @@ - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACSchedul RACDisposable *selfDisposable = [self subscribeNext:^(id x) { @synchronized (timerDisposable) { if (timerDisposable.disposable != nil) latestValue = x; else { [subscriber sendNext:x]; -
fpillet created this gist
Feb 19, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,35 @@ // // Created by Florent Pillet on 30/01/15. // #import <Foundation/Foundation.h> @interface RACSignal (FPOperations) /// Delivers the receiver's latest `next`s with a minimum of `interval` /// seconds between two values. Of `next` values produced by the receiver /// in a period of `interval` seconds, only the last one is kept and sent /// by the return signal; intermediate values are discarded. /// /// interval - The interval in which values are grouped into one buffer. /// scheduler - The scheduler upon which the returned signal will deliver its /// values. This must not be nil or +[RACScheduler /// immediateScheduler]. /// /// Returns a signal which sends the latest value at (at least) each interval on `scheduler`. /// When the receiver completes, any unsent latest value will be sent immediately before /// the signal completes. - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler; /// Resubscribes to the receiver when it completes, after waiting for the specified delay. /// This is the equivalent of the standard -repeat operation, with the addition of a delay to /// avoid fast cycles for signals that complete quickly. /// /// delay - The delay after which the receiver is resubscribed to, on the scheduler that is /// current at the time the receiver completes / errors. /// /// The returned signal will pass all `next' to its subscribers. On `error' it will error too, /// effectively breaking the re-subscription cycle. - (RACSignal *)repeatAfter:(NSTimeInterval)delay; @end This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,115 @@ // // Created by Florent Pillet on 30/01/15. // #import "RACSignal+FPOperations.h" @implementation RACSignal (FPOperations) // Code extracted from ReactiveCococa and modified to suit my needs // Subscribes to the given signal with the given blocks. // // If the signal errors or completes, the corresponding block is invoked. If the // disposable passed to the block is _not_ disposed, then the signal is // subscribed to again. static RACDisposable *subscribeForever (RACSignal *signal, NSTimeInterval delay, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) { next = [next copy]; error = [error copy]; completed = [completed copy]; RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable]; RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) { RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable]; [compoundDisposable addDisposable:selfDisposable]; __weak RACDisposable *weakSelfDisposable = selfDisposable; RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) { @autoreleasepool { error(e, compoundDisposable); [compoundDisposable removeDisposable:weakSelfDisposable]; } [RACScheduler.currentScheduler afterDelay:delay schedule:recurse]; } completed:^{ @autoreleasepool { completed(compoundDisposable); [compoundDisposable removeDisposable:weakSelfDisposable]; } [RACScheduler.currentScheduler afterDelay:delay schedule:recurse]; }]; [selfDisposable addDisposable:subscriptionDisposable]; }; // Subscribe once immediately, and then use recursive scheduling for any // further resubscriptions. recursiveBlock(^{ RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler]; RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock]; [compoundDisposable addDisposable:schedulingDisposable]; }); return compoundDisposable; } - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler { NSCParameterAssert(scheduler != nil); NSCParameterAssert(scheduler != RACScheduler.immediateScheduler); return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init]; __block id latestValue = nil; void (^sendLatest)() = ^{ @synchronized (timerDisposable) { [timerDisposable.disposable dispose]; if (latestValue) { [subscriber sendNext:latestValue]; latestValue = nil; } } }; RACDisposable *selfDisposable = [self subscribeNext:^(id x) { @synchronized (timerDisposable) { if (timerDisposable != nil) latestValue = x; else { [subscriber sendNext:x]; timerDisposable.disposable = [scheduler afterDelay:interval schedule:sendLatest]; } } } error:^(NSError *error) { [subscriber sendError:error]; } completed:^{ sendLatest(); [subscriber sendCompleted]; }]; return [RACDisposable disposableWithBlock:^{ [selfDisposable dispose]; [timerDisposable dispose]; }]; }] setNameWithFormat:@"[%@] -sampleAtInterval: %f onScheduler: %@", self.name, (double)interval, scheduler]; } - (RACSignal *)repeatAfter:(NSTimeInterval)delay { return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) { return subscribeForever(self, delay, ^(id x) { [subscriber sendNext:x]; }, ^(NSError *error, RACDisposable *disposable) { [disposable dispose]; [subscriber sendError:error]; }, ^(RACDisposable *disposable) { // Resubscribe. }); }] setNameWithFormat:@"[%@] -repeatAfter: %f", self.name, delay]; } @end