Created
March 31, 2022 15:46
-
-
Save arichiardi/fabd517bbf3a19416e0741cec7b36bf9 to your computer and use it in GitHub Desktop.
Very simple parallel testing utility
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns test.parallel | |
(:require | |
[clojure.core.async :as clj-async :refer [>! go]]) | |
(:import | |
java.util.concurrent.CountDownLatch | |
java.util.concurrent.TimeUnit)) | |
(def | |
^{:dynamic true | |
:doc | |
"Parallel task execution timeout. | |
For waiting all the threads to finish (done CountDownLatch)."} | |
*latch-timeout* | |
3) | |
(def | |
^{:dynamic true | |
:doc | |
"Parallel task execution timeout. | |
For waiting all the threads to finish (done CountDownLatch)"} | |
*latch-timeout-unit* | |
TimeUnit/MINUTES) | |
(def ^{:dynamic true :doc "Parallel thread maximum"} *max-threads* 10) | |
(def ^{:dynamic true :doc "Maximum test timeout, in milliseconds."} *max-timeout-ms* 10000) | |
(def ^{:dynamic true :doc "Whether or not to print the execution log."} *print-execution-log* true) | |
(defn format-thread-prefix [thread-id] (format "[thd #%s]" thread-id)) | |
(defn done-latch-timeout-ex-info | |
[] | |
(ex-info (format "Timed out waiting for all threads to finish (> %s seconds)" | |
(.toSeconds ^TimeUnit *latch-timeout-unit* *latch-timeout*)) | |
{:latch-timeout *latch-timeout* :latch-timeout-unit *latch-timeout-unit*})) | |
(defn result-timeout-ex-info | |
[] | |
(ex-info (format "Timed out while waiting for threads results (> %s seconds)" *max-timeout-ms*) | |
{:max-timeout-ms *max-timeout-ms*})) | |
(defn run-race | |
"Run functions in parallel, starting all at the same time. | |
Returns a {:results [] :exceptions []} map. | |
It also prints out a bunch of logs." | |
[& fns] | |
(let [result-chan (clj-async/chan (* 10 *max-threads*)) ;; to be sure | |
start-latch (CountDownLatch. 1) | |
done-latch (CountDownLatch. *max-threads*) | |
results (atom {:results [] :exceptions []}) | |
fns (into [] fns) | |
n (count fns)] | |
(dotimes [i *max-threads*] | |
(go | |
(try | |
(.await start-latch) | |
(>! result-chan {:type :log :thread i :msg "started"}) | |
(let [f (get fns (mod i n)) | |
result (f)] | |
(>! result-chan {:type :result :thread i :payload result})) | |
(catch InterruptedException e (>! result-chan {:type :exception :thread i :exception e})) | |
(catch Throwable e (>! result-chan {:type :exception :thread i :exception e})) | |
(finally (>! result-chan {:type :log :thread i :msg "ended"}) (.countDown done-latch))))) | |
(println "Start thread execution") | |
(.countDown start-latch) | |
(println (format "Waiting for %s threads to be done" (.getCount done-latch))) | |
(if (.await done-latch *latch-timeout* *latch-timeout-unit*) | |
(do (println "All threads done") (clj-async/close! result-chan)) | |
(throw (done-latch-timeout-ex-info))) | |
(loop [] | |
(if-let [event (clj-async/<!! result-chan)] | |
(do | |
(condp = (:type event) | |
:log (println (format-thread-prefix (:thread event)) (:msg event)) | |
:exception (do (print (format-thread-prefix (:thread event)) " ") | |
(print-exception (:exception event)) | |
(print "\n") | |
(swap! results update :exceptions conj (:exception event))) | |
:result (do (println (format-thread-prefix (:thread event)) (:payload event)) | |
(swap! results update :results conj (:payload event)))) | |
(recur)) | |
(println "Result channel closed"))) | |
@results)) | |
(comment | |
(binding [*max-threads* 50] | |
(let [state (atom 0) | |
{:keys [results exceptions]} | |
(run-race | |
#(hash-map :state (swap! state inc)) | |
#(throw (ex-info "Oh noes!" {})))] | |
(clojure.test/is (= (/ *max-threads* 2)) (count (distinct results))) | |
(clojure.test/is (= (/ *max-threads* 2) (count exceptions))))) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment