Last active
May 1, 2017 07:07
-
-
Save robert-stuttaford/54029eaf9eb37e96d450 to your computer and use it in GitHub Desktop.
Using Onyx with Trapperkeeper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{:environment :production | |
:global {:logging-config "./logback.xml"} | |
:onyx {:job-scheduler :onyx.job-scheduler/balanced | |
:task-scheduler :onyx.task-scheduler/balanced | |
:peer-config {:onyx.messaging/impl :netty | |
:onyx.messaging/peer-port-range [40200 40220] | |
:onyx.messaging/peer-ports [40199] | |
:onyx.messaging/bind-addr "localhost" | |
:onyx.messaging/backpressure-strategy :high-restart-latency} | |
:peer-count 20 | |
:zookeeper {:address "127.0.0.1:2186" | |
:port 2186}}} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns a.trapperkeeper.service.onyx | |
(:require [clojure.tools.logging :as log] | |
[onyx.api :as onyx] | |
[plumbing.core :refer :all] | |
[puppetlabs.trapperkeeper.core :as tk] | |
[puppetlabs.trapperkeeper.services :refer [service-context]]) | |
(:import [java.util UUID])) | |
(defnk config [onyx-id job-scheduler {server? false} {peer? false} peer-config zookeeper] | |
(merge | |
(cond-> {:zookeeper/address (:address zookeeper) | |
:onyx/id onyx-id | |
:onyx.peer/job-scheduler job-scheduler} | |
server? (merge {:zookeeper/server? true | |
:zookeeper.server/port (:port zookeeper)})) | |
(if peer? | |
peer-config | |
{}))) | |
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; | |
;;; Service | |
(defprotocol OnyxService | |
(submit-job [this catalog workflow flow-conditions lifecycles]) | |
(kill-job [this job-uuid])) | |
(tk/defservice service | |
OnyxService | |
[[:ConfigService get-in-config]] | |
(start [this context] | |
(log/info "Starting Onyx service") | |
(let [onyx-config (-> (get-in-config [:onyx]) | |
(assoc :onyx-id (UUID/randomUUID))) | |
env-config (config (assoc onyx-config :server? true)) | |
peer-config (config (assoc onyx-config :peer? true)) | |
env (onyx/start-env env-config) | |
peer-group (onyx.api/start-peer-group peer-config) | |
peers (onyx/start-peers (:peer-count onyx-config) peer-group)] | |
(assoc context :env env :peer-config peer-config :peers peers))) | |
(stop [this {:keys [env peers] :as context}] | |
(log/info "Stopping Onyx service") | |
(doseq [peer peers] | |
(try | |
(onyx/shutdown-peer peer) | |
(catch Throwable e (log/info "Error during peer shutdown:" e)))) | |
(try | |
(log/info "* Stopping env") | |
(onyx/shutdown-env env) | |
(catch Throwable e (log/warn "Error during env shutdown:" e))) | |
(dissoc context :conn :peers)) | |
(submit-job | |
[this catalog workflow flow-conditions lifecycles] | |
(log/info "Submitting job") | |
(onyx/submit-job (:peer-config (service-context this)) | |
{:catalog catalog | |
:workflow workflow | |
:lifecycles lifecycles | |
:flow-conditions flow-conditions | |
:task-scheduler (get-in-config [:onyx :task-scheduler])})) | |
(kill-job | |
[this job-id] | |
(log/info "Killing job") | |
(onyx.api/kill-job (:peer-config (service-context this)) job-id))) | |
;; We have another service which calls (submit-job) in its own (start) implementation, and (kill-job) in its (stop). | |
;; This job has Datomic tx-report-queue :input and Datomic transact :output catalog items. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment