Skip to content

Instantly share code, notes, and snippets.

@fpillet
Last active August 29, 2015 14:15

Revisions

  1. fpillet revised this gist Feb 20, 2015. 1 changed file with 2 additions and 1 deletion.
    3 changes: 2 additions & 1 deletion RACSignal+FPOperations.m
    Original file line number Diff line number Diff line change
    @@ -65,6 +65,7 @@ - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACSchedul
    void (^sendLatest)() = ^{
    @synchronized (timerDisposable) {
    [timerDisposable.disposable dispose];
    timerDisposable.disposable = nil;
    if (latestValue) {
    [subscriber sendNext:latestValue];
    latestValue = nil;
    @@ -74,7 +75,7 @@ - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACSchedul

    RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
    @synchronized (timerDisposable) {
    if (timerDisposable != nil)
    if (timerDisposable.disposable != nil)
    latestValue = x;
    else {
    [subscriber sendNext:x];
  2. fpillet created this gist Feb 19, 2015.
    35 changes: 35 additions & 0 deletions RACSignal+FPOperations.h
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,35 @@
    //
    // Created by Florent Pillet on 30/01/15.
    //

    #import <Foundation/Foundation.h>

    @interface RACSignal (FPOperations)

    /// Delivers the receiver's latest `next`s with a minimum of `interval`
    /// seconds between two values. Of `next` values produced by the receiver
    /// in a period of `interval` seconds, only the last one is kept and sent
    /// by the return signal; intermediate values are discarded.
    ///
    /// interval - The interval in which values are grouped into one buffer.
    /// scheduler - The scheduler upon which the returned signal will deliver its
    /// values. This must not be nil or +[RACScheduler
    /// immediateScheduler].
    ///
    /// Returns a signal which sends the latest value at (at least) each interval on `scheduler`.
    /// When the receiver completes, any unsent latest value will be sent immediately before
    /// the signal completes.
    - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler;

    /// Resubscribes to the receiver when it completes, after waiting for the specified delay.
    /// This is the equivalent of the standard -repeat operation, with the addition of a delay to
    /// avoid fast cycles for signals that complete quickly.
    ///
    /// delay - The delay after which the receiver is resubscribed to, on the scheduler that is
    /// current at the time the receiver completes / errors.
    ///
    /// The returned signal will pass all `next' to its subscribers. On `error' it will error too,
    /// effectively breaking the re-subscription cycle.
    - (RACSignal *)repeatAfter:(NSTimeInterval)delay;

    @end
    115 changes: 115 additions & 0 deletions RACSignal+FPOperations.m
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,115 @@
    //
    // Created by Florent Pillet on 30/01/15.
    //

    #import "RACSignal+FPOperations.h"

    @implementation RACSignal (FPOperations)

    // Code extracted from ReactiveCococa and modified to suit my needs
    // Subscribes to the given signal with the given blocks.
    //
    // If the signal errors or completes, the corresponding block is invoked. If the
    // disposable passed to the block is _not_ disposed, then the signal is
    // subscribed to again.
    static RACDisposable *subscribeForever (RACSignal *signal, NSTimeInterval delay, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
    next = [next copy];
    error = [error copy];
    completed = [completed copy];

    RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];

    RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
    RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
    [compoundDisposable addDisposable:selfDisposable];

    __weak RACDisposable *weakSelfDisposable = selfDisposable;

    RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
    @autoreleasepool {
    error(e, compoundDisposable);
    [compoundDisposable removeDisposable:weakSelfDisposable];
    }
    [RACScheduler.currentScheduler afterDelay:delay schedule:recurse];
    } completed:^{
    @autoreleasepool {
    completed(compoundDisposable);
    [compoundDisposable removeDisposable:weakSelfDisposable];
    }
    [RACScheduler.currentScheduler afterDelay:delay schedule:recurse];
    }];

    [selfDisposable addDisposable:subscriptionDisposable];
    };

    // Subscribe once immediately, and then use recursive scheduling for any
    // further resubscriptions.
    recursiveBlock(^{
    RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];

    RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
    [compoundDisposable addDisposable:schedulingDisposable];
    });

    return compoundDisposable;
    }

    - (RACSignal *)sampleAtInterval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
    NSCParameterAssert(scheduler != nil);
    NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);

    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
    __block id latestValue = nil;

    void (^sendLatest)() = ^{
    @synchronized (timerDisposable) {
    [timerDisposable.disposable dispose];
    if (latestValue) {
    [subscriber sendNext:latestValue];
    latestValue = nil;
    }
    }
    };

    RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
    @synchronized (timerDisposable) {
    if (timerDisposable != nil)
    latestValue = x;
    else {
    [subscriber sendNext:x];
    timerDisposable.disposable = [scheduler afterDelay:interval schedule:sendLatest];
    }
    }
    } error:^(NSError *error) {
    [subscriber sendError:error];
    } completed:^{
    sendLatest();
    [subscriber sendCompleted];
    }];

    return [RACDisposable disposableWithBlock:^{
    [selfDisposable dispose];
    [timerDisposable dispose];
    }];
    }] setNameWithFormat:@"[%@] -sampleAtInterval: %f onScheduler: %@", self.name, (double)interval, scheduler];
    }

    - (RACSignal *)repeatAfter:(NSTimeInterval)delay
    {
    return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
    return subscribeForever(self, delay,
    ^(id x) {
    [subscriber sendNext:x];
    },
    ^(NSError *error, RACDisposable *disposable) {
    [disposable dispose];
    [subscriber sendError:error];
    },
    ^(RACDisposable *disposable) {
    // Resubscribe.
    });
    }] setNameWithFormat:@"[%@] -repeatAfter: %f", self.name, delay];
    }

    @end