Last active
January 20, 2025 17:48
-
-
Save ChrisBlom/940fdeab7066802b203be4380d5ea1ae to your computer and use it in GitHub Desktop.
Datomic change data capture
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns datomic-cdc.core | |
"proof-of-concept that shows how setup change-data-capture for datomic" | |
(:require [datomic.api :as d])) | |
(def processed-t- (atom nil)) | |
(defn start-cdc-thread | |
"starts a new thread to processes all past transactions starting at start-t, then continues processing incoming transactions, using the provided `change-handler` | |
`change-handler` must be a function that takes a single map argument with | |
:t the id of the transaction | |
:data the transacted datoms | |
:db-after the database as it is after the transaction | |
If start-t is nil, starts from the beginning | |
Returns a 0-arg function that stops the thread when invoked." | |
[conn start-t change-handler] | |
(let [txq (d/tx-report-queue conn) | |
txs-in-range (d/tx-range (d/log conn) start-t nil) | |
last-t-in-range (or (:t (last txs-in-range)) start-t) | |
running (atom true) | |
process-thread (Thread. | |
(try | |
(fn cdc-loop [] | |
(println "Processing tx's from" (:t (first txs-in-range)) "to" last-t-in-range) | |
;; process all transaction from start-t | |
(doseq [{:keys [t data] :as tx} txs-in-range] | |
(let [change-event {:t t | |
:data data | |
:db-after (d/as-of (d/db conn) t)}] | |
(change-handler change-event))) | |
;; process new transactions | |
(println "Processing tx's after " last-t-in-range) | |
(try | |
(println "started cdc thread") | |
(while @running | |
(let [tx (.take txq) | |
t (d/basis-t (:db-after tx)) | |
change-event {:t t | |
:data (into [] (:tx-data tx)) | |
:db-after (:db-after tx)}] | |
(when (> t last-t-in-range) ; skip tx's that are also reported by tx-range | |
(change-handler change-event)))) | |
(catch InterruptedException e | |
(println "Stopped cdc thread")))) | |
(catch Exception e | |
(.printStackTrace e))) | |
"datomic-cdc-thread")] | |
(.setDaemon process-thread true) | |
(.start process-thread) | |
(fn cdc-stop-fn [] | |
(when-let [was-running (first (swap-vals! running (constantly false)))] | |
(println "Stopping cdc thread...") | |
(d/remove-tx-report-queue conn) | |
(.interrupt process-thread) | |
was-running)))) | |
(def followers-by-user- | |
"the :user/follows relation realized as {entity #{values}}" | |
(atom {})) | |
(defn add-follower [follow-by-user user follower] | |
(update follow-by-user user (fnil conj #{}) follower)) | |
(defn remove-follower [follow-by-user user follower] | |
(update follow-by-user user disj follower)) | |
(def feed-by-user (atom {})) | |
(defn followers-handler [{:keys [t data db-after]}] | |
(when-first [ [e a v _ add] (filter (fn [ [e a v _ _] ] (= a (d/entid db-after :user/follows))) data)] | |
(let [user (:user/name (d/entity db-after e)) | |
follows (:user/name (d/entity db-after v))] | |
(println "Updating follow list: " user " -> " follows " : " add) | |
(swap! followers-by-user- (if add add-follower remove-follower) user follows)))) | |
(comment | |
(def db-uri "datomic:mem://tx-cdc-demo") | |
(d/create-database db-uri) | |
(def conn (d/connect db-uri)) | |
(def cdc-thread (start-cdc-thread conn @processed-t- | |
(fn [change-event] | |
(println "Handling change: " change-event) | |
(followers-handler change-event) | |
(println "Setting processed-t to" (:t change-event)) | |
(reset! processed-t- (:t change-event))))) | |
;; setup schema | |
@(d/transact conn | |
[{:db/ident :user/name | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db/unique :db.unique/identity | |
:db/doc "The name of the user"} | |
{:db/ident :user/follows | |
:db/valueType :db.type/ref | |
:db/cardinality :db.cardinality/many | |
:db/doc "The other users that this user follows"} | |
{:db/ident :user/message | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/many | |
:db/doc "The message produced by the user"} | |
{:db/ident :message/content | |
:db/valueType :db.type/string | |
:db/cardinality :db.cardinality/one | |
:db/doc "The content of a message"} | |
{:db/ident :message/timestamp | |
:db/valueType :db.type/instant | |
:db/cardinality :db.cardinality/one | |
:db/doc "The timestamp when the message was written"}]) | |
;; add users | |
@(d/transact conn [{:db/id (d/tempid (d/implicit-part 1)) :user/name "Chris Blom"} | |
{:db/id (d/tempid (d/implicit-part 1)) :user/name "Rich Hickey"} | |
{:db/id (d/tempid (d/implicit-part 1)) :user/name "Alan Kay"} | |
{:db/id (d/tempid (d/implicit-part 1)) :user/name "Guy Steele"}]) | |
;; follow some people | |
@(d/transact conn [ [:db/add [:user/name "Chris Blom"] :user/follows [:user/name "Rich Hickey"]]]) | |
@(d/transact conn [ [:db/add [:user/name "Rich Hickey"] :user/follows [:user/name "Alan Kay"]]]) | |
@(d/transact conn [ [:db/add [:user/name "Chris Blom"] :user/follows [:user/name "Alan Kay"]]]) | |
@(d/transact conn [ [:db/add [:user/name "Chris Blom"] :user/follows [:user/name "Guy Steele"]]]) | |
;; the followers-by-user- state should show the followers | |
@followers-by-user- | |
;; unfollow Alan Kay | |
@(d/transact conn [ [:db/retract [:user/name "Chris Blom"] :user/follows [:user/name "Alan Kay"]]]) | |
;; the followers-by-user- state is being kept up to date with the changes to the database | |
(contains? (get @followers-by-user- "Chris Blom") "Alan Kay") | |
;; => true | |
;; the followers-by-user- state should reflect the followers in the database | |
(into {} | |
(d/q '[:find ?u-name (set ?f-name) | |
:where | |
[ ?u :user/name ?u-name ] | |
[ ?u :user/follows ?f] | |
[ ?f :user/name ?f-name] | |
] | |
(d/db conn) | |
)) | |
;; processed-t- tracks the last processed transaction | |
@processed-t- | |
;; the cdc loop can be stopped | |
(cdc-thread) | |
;; update a fact while the cdc loop is stopped | |
@(d/transact conn [ [:db/retract [:user/name "Chris Blom"] :user/follows [:user/name "Rich Hickey"]]]) | |
;; followers-by-users- is now not in sync with the database | |
(contains? (get @followers-by-user- "Chris Blom") "Rich Hickey") | |
;; => true | |
;; restarting the cdc loop from @processed-t- will process the transactions that | |
;; where performed while the cdc loop was stopped | |
(def cdc-thread (start-cdc-thread conn @processed-t- | |
(fn [change-event] | |
(println "Handling change: " change-event) | |
(followers-handler change-event) | |
(println "Setting processed-t to" (:t change-event)) | |
(reset! processed-t- (:t change-event))))) | |
;; followers-by-users- is in sync again with the database: | |
(contains? (get @followers-by-user- "Chris Blom") "Rich Hickey") | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment