Skip to content

Instantly share code, notes, and snippets.

@mschubert
Last active August 29, 2015 14:27
Show Gist options
  • Save mschubert/79aae1c2b4f9500c05d3 to your computer and use it in GitHub Desktop.
Save mschubert/79aae1c2b4f9500c05d3 to your computer and use it in GitHub Desktop.
Minimal example of using rzmq to submit a worker job using LSF
#BSUB-J {{ job_name }} # name of the job / array jobs
#BSUB-o {{ log_file }} # output is sent to logfile, stdout + stderr by default
#BSUB-P {{ queue }} # Job queue
#BSUB-W {{ walltime }} # Walltime in minutes
#BSUB-M {{ memory }} # Memory requirements in Mbytes
#BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
#BSUB-R select[panfs_nobackup_research]
R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}"
# use the template & submit
library(infuser)
values = list(
job_name = "jn",
log_file = "~/bsub.log",
queue = "research-rh6",
walltime = 10080,
memory = 1024,
rscript = "/nfs/research2/saezrodriguez/mike/rzmq/worker.r",
args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124)
)
filled = infuser::infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values)
system("bsub", input=filled)
# bind status socket
library(rzmq)
zmq.context = init.context()
socket = init.socket(zmq.context, "ZMQ_REP")
bind.socket(socket, "tcp://*:6124")
fun = function(x, y) x*x+y
job_data = 1:10
job_const = list(y=100)
job_status = rep("queued", length(job_data))
job_result = list()
while(any(job_status %in% c("queued", "running"))) {
msg = receive.socket(socket)
print(msg)
if (msg$id == 0)
send.socket(socket, data=list(fun=fun, const=job_const), send.more=TRUE)
else {
job_status[msg$id] = "done"
job_result[[msg$id]] = msg$result
}
id = which(job_status == "queued")[1]
if (!is.na(id)) {
send.socket(socket, data=list(id=id, iter=job_data[id]))
job_status[id] = "running"
} else
send.socket(socket, data=list(id=0))
Sys.sleep(0.1)
}
# this should be submitted by bsub
# and get the server as argument
master = commandArgs(TRUE)[1]
#master = "tcp://ebi-002.ebi.ac.uk:6124"
print(master)
library(rzmq)
context = init.context()
socket = init.socket(context,"ZMQ_REQ")
connect.socket(socket, master)
send.socket(socket, data=list(id=0))
msg = receive.socket(socket)
print(msg)
fun = msg$fun
const = msg$const
while(TRUE) {
msg = receive.socket(socket)
print(msg)
if (msg$id == 0)
break
result = do.call(fun, c(const, msg$iter))
print(result)
send.socket(socket, data=list(id = msg$id, result=result))
Sys.sleep(1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment