Skip to content

Instantly share code, notes, and snippets.

@hiredman
Created August 22, 2014 01:12

Revisions

  1. hiredman created this gist Aug 22, 2014.
    107 changes: 107 additions & 0 deletions join.clj
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,107 @@
    ;; 18. All warfare is based on deception.
    (ns pullrequest.join
    (:require [clojure.core.reducers :as r]
    [clojure.core.protocols :as p])
    (:import (java.util.concurrent.atomic AtomicLong)))

    (defn update-index [key-fn h i]
    (update-in h [(key-fn i)] (fnil conj #{}) [0 i]))

    (defn probe
    ([h [key-fn coll id]]
    (reduce
    (fn bar [h i]
    (let [k (key-fn i)]
    (if (empty? (get h k))
    h
    (update-in h [k] (fnil conj #{}) [id i]))))
    h
    coll)))

    (defn merge-relations
    "given a base tuple, and possible merge tuples, return a reducible
    containing merged tuples"
    [[x & xs] base]
    (if (nil? x)
    [base]
    (r/mapcat
    (fn [v]
    (let [v (second v)]
    (merge-relations xs (merge base v))))
    (set (second x)))))

    (defn hash-relation [relation]
    (let [[key-fn coll] relation]
    (reduce
    (partial update-index key-fn)
    (sorted-map)
    (second relation))))

    (defn fold-fold [combinef reducef coll]
    (reify
    p/CollReduce
    (coll-reduce [_ fun]
    (r/reduce fun (r/reduce reducef (combinef) coll)))
    (coll-reduce [_ fun init]
    (r/reduce fun init (r/reduce reducef (combinef) coll)))
    p/IKVReduce
    (kv-reduce [_ fun init]
    (r/reduce fun init (r/reduce reducef (combinef) coll)))
    r/CollFold
    (coll-fold [_ n fcombinef freducef]
    (r/fold n fcombinef freducef
    (r/fold n combinef reducef coll)))))

    (defn join
    "joins two or more relations (generally collections of maps)
    input is some number of [key-fn coll], key-fn being applied to each
    item of the collection to determine which item of the other
    collections to join"
    [relations]
    {:pre [(or (even? (count relations))
    (map? relations))]}
    (let [[relation & relations] (if (map? relations)
    (seq relations)
    (map vec (partition-all 2 relations)))
    rels (inc (count relations))
    hash-of-first (hash-relation relation)
    n (AtomicLong. 1)]
    (->> (vec relations)
    (r/map (fn [relation] (conj relation (.incrementAndGet n))))
    (fold-fold (r/monoid (partial merge-with into)
    (constantly hash-of-first))
    probe)
    (r/map (fn [_ vals] (group-by first vals)))
    (r/filter #(= rels (count %)))
    (r/mapcat (fn [gv] (merge-relations (seq gv) {})))
    (into []))))


    (comment

    (assert (= (join
    [:baz (for [i (shuffle (range 100))
    :when (>= 60 i 50)]
    {:baz (str i)
    :quuz (* i 10)})
    :foo (for [i (shuffle (range 100))]
    {:foo (str i)
    :bar (* i 10)})
    :lucy (for [i (shuffle (range 100))]
    {:lucy (str i)
    :quuz (* i 10)})])
    [{:baz "50", :quuz 500, :lucy "50", :bar 500, :foo "50"}
    {:lucy "51", :quuz 510, :baz "51", :bar 510, :foo "51"}
    {:bar 520, :foo "52", :lucy "52", :quuz 520, :baz "52"}
    {:lucy "53", :quuz 530, :baz "53", :bar 530, :foo "53"}
    {:baz "54", :quuz 540, :lucy "54", :bar 540, :foo "54"}
    {:lucy "55", :bar 550, :foo "55", :quuz 550, :baz "55"}
    {:baz "56", :bar 560, :foo "56", :quuz 560, :lucy "56"}
    {:baz "57", :bar 570, :foo "57", :quuz 570, :lucy "57"}
    {:lucy "58", :quuz 580, :baz "58", :bar 580, :foo "58"}
    {:bar 590, :foo "59", :lucy "59", :quuz 590, :baz "59"}
    {:lucy "60", :bar 600, :foo "60", :quuz 600, :baz "60"}]))


    )