Last active
August 25, 2020 07:48
-
-
Save jamesikanos/84baffaf36244d099ef7a28e473f094b to your computer and use it in GitHub Desktop.
A drop-in app.component.ts replacement for a Stock Ticker Application. Demonstrates how to use both Promises and Observables in the same pipeline.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Component, EventEmitter } from '@angular/core'; | |
import { of, Observable, interval, merge, from, BehaviorSubject } from 'rxjs'; | |
import { switchMap, map, switchMapTo, delayWhen, filter, delay, tap, take, shareReplay, catchError } from 'rxjs/operators'; | |
import { HttpClient } from '@angular/common/http'; | |
const rngUrl = "https://www.random.org/integers/?num=1&min=1&max=6000&col=1&base=10&format=plain&rnd=new"; | |
@Component({ | |
selector: 'app-root', | |
template: ` | |
<div> | |
<h1>Developer Designed Stock Application</h1> | |
<button (click)="refreshRequest.emit()" [disabled]="isRefreshing$ | async">Refresh Stock Data</button> | |
<strong>Current Stock Price: {{ currentPrice$ | async }}</strong> | |
<span *ngIf="isLive$ | async">Data is being streamed live.</span> | |
<span *ngIf="!(isLive$ | async)">Data is cached.</span> | |
<i *ngIf="isRefreshing$ | async">Data is Refreshing...</i> | |
<strong *ngIf="error$ | async as error">Error: {{ error }}</strong> | |
</div> | |
`, | |
styles: [ | |
'* { display: block; margin: 1rem 0; } h1 { margin: 0 } div { border: 1px solid grey; padding: 0.3em; display: inline-block; }' | |
] | |
}) | |
export class AppComponent { | |
readonly currentPrice$ : Observable<number>; | |
readonly isLive$ = new BehaviorSubject<boolean>(false); | |
readonly isRefreshing$ = new BehaviorSubject<boolean>(false); | |
readonly error$ = new BehaviorSubject<string>(null); | |
refreshRequest = new EventEmitter(); | |
constructor(private httpClient: HttpClient) { | |
// Observable to fetch the first (only, not-null) cache value | |
const cacheValue$ = from(this._retrieveCacheValue()).pipe( | |
take(1), // Take the first value only | |
filter(i => !!i), // Only return non-null values | |
tap(() => this.isLive$.next(false)) | |
); | |
// The "seed" for our observable kicks off from: | |
// 1. Blank observable 'of' for an intitial kick | |
// 2. emit() from requestRequest | |
// 3. interval(60 seconds) - using 10 seconds because I'm impatient | |
const seed$ = merge(of(null).pipe(delay(0)), this.refreshRequest, interval(10000)); | |
// Observable to return the live price | |
const livePrice$ = of(null) | |
.pipe( | |
// When the Observable fires, request the Stock Price | |
switchMapTo(this._fetchAndStore()), | |
tap(() => this.isLive$.next(true)), | |
// Reset the error (if one was set previously) | |
tap(() => this.error$.next(null)), | |
// Catch any errors and return the cached value | |
catchError(() => this._handleError()) | |
); | |
// Separate the live price from the seed | |
const wrappedLivePrice$ = seed$.pipe(switchMapTo(livePrice$)); | |
// Current Price Observable, now a merge of the cache and live values | |
this.currentPrice$ = merge(cacheValue$, wrappedLivePrice$).pipe( | |
// Parse the value from the body into a number | |
map(i => parseInt(i)), | |
shareReplay() | |
); | |
} | |
private _handleError() { | |
// Create a new observable to return the cache value and clear the UI states | |
return from(this._retrieveCacheValue()).pipe( | |
tap(() => this.error$.next("Unable to refresh")), | |
tap(() => this.isRefreshing$.next(false)), | |
tap(() => this.isLive$.next(false)), // Inform that we're using the cache | |
); | |
} | |
/** | |
* Creates the HTTP Observable and store the value in the cache | |
*/ | |
private _fetchAndStore() { | |
// Create the fetch$ Observable | |
const fetch$ = this.httpClient.get(rngUrl, { responseType: 'text'}) | |
.pipe(delayWhen(i => from(this._storeInCache(i)))); | |
// Wrap the fetch$ Observable to set the isRefreshing$ value | |
return of(true).pipe( | |
tap(() => this.isRefreshing$.next(true)), | |
switchMap(() => fetch$), | |
tap(() => this.isRefreshing$.next(false)) | |
); | |
} | |
/** | |
* Retrieves the value from "/stockdata.txt". Null if nothing stored. | |
*/ | |
private async _retrieveCacheValue() { | |
const cache = await caches.open("StockData"); | |
const cacheMatch = await cache.match("/stockdata.txt"); | |
if (!cacheMatch) { | |
return null; | |
} | |
return await cacheMatch.text(); | |
} | |
/** | |
* Stores the value in the /stockdata.txt cache location | |
* @param newValue Value to store in cache | |
*/ | |
private async _storeInCache(newValue: string) { | |
// Open the Cache | |
const cache = await caches.open("StockData"); | |
// Create the Data Blob containing the values to cache | |
const dataBlob = new Blob([newValue], { type: "text/plain" }); | |
// Store the cache data | |
await cache.put("/stockdata.txt", new Response(dataBlob)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This file accompanies the article: https://medium.com/@jamesikanos/rxjs-tips-promises-into-observables-468e9534b5d9