Last active
February 20, 2021 09:19
-
-
Save TotooriaHyperion/0e758558f54ad543047b8db8fbd25921 to your computer and use it in GitHub Desktop.
react - rxjs & reactive
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ReactDOM from "react-dom"; | |
import { Observable } from "rxjs"; | |
// wrap this on every [source] of observable(eg: from(), subject, fromEvent(), fromFetch()) | |
// to let all subscriber to run in unstable_batchedUpdates | |
export const reactBatch = <T>(obs: Observable<T>): Observable<T> => { | |
return new Observable<T>((observer) => { | |
// 如果是测试环境(对model进行单元测试) | |
// 则需要把 unstable_batchedUpdates 替换成直接执行 | |
return obs.subscribe({ | |
next: (v) => ReactDOM.unstable_batchedUpdates(() => observer.next(v)), | |
error: (err) => observer.error(err), | |
complete: () => observer.complete(), | |
}); | |
}); | |
}; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { useDebugValue, useMemo } from "react"; | |
import { Observable, Subscription as RxSubscription } from "rxjs"; | |
import { Subscription, useSubscription } from "use-subscription"; | |
export function useObservables<T extends [...Observable<any>[]]>( | |
...obs: T | |
): { | |
[key in keyof T]: T[key] extends Observable<infer V> ? V | undefined : never; | |
} { | |
const subscription = useMemo<Subscription<any>>(() => { | |
let dirty = false; | |
let value: any = []; | |
// get initial value if possible | |
obs.forEach((ob, idx) => { | |
ob.subscribe((v) => { | |
value[idx] = v; | |
}).unsubscribe(); | |
}); | |
return { | |
subscribe: (cb) => { | |
const sub = new RxSubscription(); | |
obs.forEach((ob, idx) => { | |
sub.add( | |
ob.subscribe((v) => { | |
if (v !== value[idx]) { | |
value[idx] = v; | |
// mark dirty | |
dirty = true; | |
// trigger update | |
cb(); | |
} | |
}), | |
); | |
}); | |
return () => sub.unsubscribe(); | |
}, | |
getCurrentValue: () => { | |
if (dirty) { | |
// for immutability | |
value = [...value]; | |
dirty = false; | |
} | |
return value; | |
}, | |
}; | |
}, obs); | |
const result = useSubscription(subscription); | |
useDebugValue(result); | |
return result; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { useDebugValue, useMemo } from "react"; | |
import { Observable, Subscription as RxSubscription } from "rxjs"; | |
import { Subscription, useSubscription } from "use-subscription"; | |
export function useObservables<T extends [...Observable<any>[]]>( | |
...obs: T | |
): { | |
[key in keyof T]: T[key] extends Observable<infer V> ? V | undefined : never; | |
} { | |
const subscription = useMemo<Subscription<any>>(() => { | |
let dirty = false; | |
let value: any = []; | |
let result: any = []; | |
let update: (() => void) | null = null; | |
let sub: RxSubscription | null = null; | |
let activated = false; | |
const bootstrap = () => { | |
// 执行订阅 | |
if (!sub) { | |
sub = new RxSubscription(); | |
obs.forEach((ob, idx) => { | |
sub!.add( | |
ob.subscribe((v) => { | |
if (v !== value[idx]) { | |
value[idx] = v; | |
// mark dirty | |
dirty = true; | |
// trigger update | |
update?.(); | |
} | |
}), | |
); | |
}); | |
} | |
}; | |
const tryActivate = () => { | |
// 激活订阅 | |
// 为什么?因为我们不希望在 getCurrentValue 里 subscribe().unsubscribe() | |
// 因为他们会影响 rxjs observables 的 connect 行为 | |
// 而 use-subscription 在订阅之前就会执行 getCurrentValue 来获取当前值 | |
// 因此 getCurrentValue 也需要触发订阅,但在整个过程中,只应该进行1次订阅和1次解除订阅 | |
// 因此提取出 tryActivate 来确保 getCurrentValue 和 subscribe 都会激活,但订阅只执行一次 | |
if (!activated) { | |
activated = true; | |
bootstrap(); | |
} | |
}; | |
const cleanUp = () => { | |
// 解除订阅 | |
sub?.unsubscribe(); | |
sub = null; | |
update = null; | |
activated = false; | |
}; | |
return { | |
subscribe: (cb) => { | |
tryActivate(); | |
update = cb; | |
return cleanUp; | |
}, | |
getCurrentValue: () => { | |
tryActivate(); | |
if (dirty) { | |
// for immutability | |
result = [...value]; | |
dirty = false; | |
} | |
return result; | |
}, | |
}; | |
}, obs); | |
const result = useSubscription(subscription); | |
useDebugValue(result); | |
return result; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment