Last active
February 21, 2024 16:47
-
-
Save leonoel/d13dd1b8045b3167dbf688f57244d221 to your computer and use it in GitHub Desktop.
Example of clojure prepl usage with missionary
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns prepl | |
(:require [clojure.core.server :as s] | |
[missionary.core :as m]) | |
(:import (java.io PipedReader PipedWriter) | |
(java.util.concurrent Executors))) | |
(defn remote " | |
Returns a function taking a `java.io.PipedWriter` and returning a flow connecting to a remote prepl on given `host` and | |
`port`, sending the content of the pipe to the remote peer and emitting received evaluation results. The pipe is closed | |
on flow cancellation. | |
" [host port] | |
(fn [^PipedWriter pipe] | |
(m/observe | |
(fn [cb] | |
(.start | |
(Thread. | |
(reify Runnable | |
(run [_] | |
(s/remote-prepl host port | |
(PipedReader. pipe) cb))))) | |
#(.close pipe))))) | |
(defn write-form " | |
Returns a task writing the edn representation of given `form` to given `java.io.Writer`. | |
" [executor writer form] | |
(m/via executor | |
(binding [*out* writer] | |
(prn form)))) | |
(defn send-forms " | |
Returns a flow serializing and writing forms produced by flow `forms` to a `java.io.PipedWriter`, and concurrently | |
running the flow returned by the `channel` function, called with the pipe. | |
" [channel forms] | |
(m/ap | |
(let [writer (PipedWriter.)] | |
(m/amb= (m/?> (channel writer)) | |
(do (m/? (write-form (Executors/newSingleThreadExecutor) | |
writer (m/?> forms))) (m/amb)))))) | |
(defn process-tuples " | |
Returns a flow processing each element of tuples produced by given flow `input` with a separate flow processor. | |
The flow processors `ps` must be a sequence matching the shape of expected tuples. Resulting tuples are collapsed with | |
given function `f`. | |
" [f ps input] | |
(let [>input (m/stream input)] | |
(->> ps | |
(map-indexed (fn [i p] (p (m/sample #(nth % i) >input)))) | |
(apply m/zip f)))) | |
(comment | |
;; start a server. | |
(def server | |
(s/start-server | |
{:accept 'clojure.core.server/io-prepl | |
:port 0 | |
:name "jvm"})) | |
;; connect to the server and define a function `eval!` to ask for remote evaluation. | |
;; `eval!` returns a task completing with the result of evaluation. | |
(def cancel | |
((->> (m/observe | |
(fn [!] | |
(defn eval! [x] | |
(let [d (m/dfv)] | |
(! [d x]) d)) | |
#(def eval! nil))) | |
(process-tuples (fn [d x] (d x)) | |
[identity (partial send-forms (remote "localhost" (.getLocalPort server)))]) | |
(m/reduce {} nil)) | |
(fn [_] (println "Remote prepl terminated.")) | |
(fn [^Throwable e] (.printStackTrace e)))) | |
(m/? (eval! '(* 6 7))) | |
(cancel) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment