Skip to content

Instantly share code, notes, and snippets.

@mattiamanzati
Created May 19, 2023 14:51

Revisions

  1. mattiamanzati created this gist May 19, 2023.
    119 changes: 119 additions & 0 deletions dataloader.ts
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,119 @@
    import * as DU from "@effect/data/Duration"
    import { pipe } from "@effect/data/Function"
    import * as HM from "@effect/data/HashMap"
    import * as O from "@effect/data/Option"
    import * as D from "@effect/io/Deferred"
    import * as T from "@effect/io/Effect"
    import * as FID from "@effect/io/Fiber/Id"

    /**
    * Given a function (key: K) => Effect<R, E, A> this DataLoader
    * batches and caches requests such as concurrent lookups
    * for the same key cannot happen.
    * Subsequent lookups will use the cache value, if any.
    */
    export class DataLoader<K, R, E, A> {
    cacheMap: HM.HashMap<K, A> = HM.empty()
    pendingArray: [K, D.Deferred<E, A>][] = []
    semaphore = T.unsafeMakeSemaphore(1)

    constructor(
    readonly resolve: (key: K[]) => T.Effect<R, E, A[]>,
    readonly cachingEnabled: boolean
    ) {}

    invalidateAll() {
    return T.sync(() => {
    this.cacheMap = HM.empty()
    })
    }

    run(): T.Effect<R, E, void> {
    return pipe(
    T.unit(),
    T.delay(DU.millis(1)),
    T.zipRight(
    T.sync(() => {
    const items = this.pendingArray.splice(0)
    let indexMap: HM.HashMap<K, D.Deferred<E, A>[]> = HM.make() as any
    for (const [key, deferred] of items) {
    indexMap = pipe(
    indexMap,
    HM.modifyAt(key, (list) =>
    pipe(
    list,
    O.getOrElse(() => [] as D.Deferred<E, A>[]),
    (_) => O.some(_.concat([deferred]))
    )
    )
    )
    }
    return indexMap
    })
    ),
    T.flatMap((indexMap) => {
    const keys = Array.from(HM.keys(indexMap))
    const allDeferreds = pipe(
    indexMap,
    HM.values,
    (_) => Array.from(_),
    (_) => _.flat()
    )
    return keys.length > 0
    ? pipe(
    this.resolve(keys),
    T.flatMap(
    T.forEachWithIndex((result, i) =>
    pipe(
    indexMap,
    HM.unsafeGet(keys[i]),
    T.forEachPar((deferred) => pipe(deferred, D.succeed(result)))
    )
    )
    ),
    T.catchAllCause((cause) =>
    pipe(
    allDeferreds,
    T.forEachPar((deferred) => pipe(deferred, D.failCause(cause)))
    )
    ),
    T.raceFirst(
    pipe(
    allDeferreds,
    T.forEach((deferred) =>
    pipe(
    D.await(deferred),
    T.catchAllCause(() => T.unit())
    )
    )
    )
    ),
    T.asUnit
    )
    : T.unit()
    }),
    this.semaphore.withPermits(1),
    T.forkDaemon,
    T.asUnit
    )
    }

    lookup(key: K): T.Effect<R, E, A> {
    const cacheEntry = pipe(this.cacheMap, HM.get(key))
    if (!this.cachingEnabled)
    return pipe(
    this.resolve([key]),
    T.map((_) => _[0])
    )
    if (this.cachingEnabled && O.isSome(cacheEntry)) return T.succeed(cacheEntry.value)
    return T.acquireUseRelease(
    T.sync(() => {
    const deferred = D.unsafeMake<E, A>(FID.none)
    this.pendingArray.push([key, deferred])
    return deferred
    }),
    (deferred) => pipe(this.run(), T.zipParRight(D.await(deferred))),
    (deferred) => D.interrupt(deferred)
    )
    }
    }