Last active
August 29, 2015 14:27
Revisions
-
mschubert revised this gist
Aug 20, 2015 . 2 changed files with 4 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -16,7 +16,7 @@ socket = init.socket(zmq.context, "ZMQ_REP") bind.socket(socket, "tcp://*:6124") # do the submissions njobs = 10 for (j in 1:njobs) { values$job_name = paste0("rzmq-", j) values$log_file = paste0(values$job_name, ".log") @@ -25,7 +25,7 @@ for (j in 1:njobs) { # define the function to run on the workers and input data fun = function(x, y) x*x+y job_data = 1:1e4 job_const = list(y=100) job_status = factor(rep("queued", length(job_data)), levels=c("queued", "running", "done")) job_result = list() @@ -48,5 +48,5 @@ while(any(job_status %in% c("queued", "running"))) { } else send.socket(socket, data=list(id=0)) Sys.sleep(0.001) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -20,11 +20,9 @@ while(TRUE) { if (msg$id == 0) break result = try(do.call(fun, c(const, msg$iter))) print(result) send.socket(socket, data=list(id = msg$id, result=result)) } -
mschubert revised this gist
Aug 16, 2015 . 1 changed file with 14 additions and 6 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,28 +1,36 @@ library(rzmq) library(infuser) # use the template & submit values = list( queue = "research-rh6", walltime = 10080, memory = 1024, rscript = "worker.r", args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124) ) # bind status socket zmq.context = init.context() socket = init.socket(zmq.context, "ZMQ_REP") bind.socket(socket, "tcp://*:6124") # do the submissions njobs = 2 for (j in 1:njobs) { values$job_name = paste0("rzmq-", j) values$log_file = paste0(values$job_name, ".log") system("bsub", input=infuse("LSF.tmpl", values)) } # define the function to run on the workers and input data fun = function(x, y) x*x+y job_data = 1:10 job_const = list(y=100) job_status = factor(rep("queued", length(job_data)), levels=c("queued", "running", "done")) job_result = list() # send and receive messages while(any(job_status %in% c("queued", "running"))) { msg = receive.socket(socket) print(msg) -
mschubert revised this gist
Aug 15, 2015 . 1 changed file with 3 additions and 4 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,15 +2,14 @@ library(infuser) values = list( job_name = "jn", log_file = "bsub.log", queue = "research-rh6", walltime = 10080, memory = 1024, rscript = "worker.r", args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124) ) system("bsub", input=infuse("LSF.tmpl", values)) # bind status socket library(rzmq) -
mschubert revised this gist
Aug 15, 2015 . 3 changed files with 8 additions and 8 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,9 +1,9 @@ #BSUB-J {{ job_name }} # name of the job / array jobs #BSUB-o {{ log_file | /dev/null }} # output is sent to logfile, stdout + stderr by default #BSUB-P {{ queue }} # Job queue #BSUB-W {{ walltime }} # Walltime in minutes #BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes #BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes #BSUB-R select[panfs_nobackup_research] R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}" This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -9,7 +9,7 @@ values = list( rscript = "/nfs/research2/saezrodriguez/mike/rzmq/worker.r", args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124) ) filled = infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values) system("bsub", input=filled) # bind status socket This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -6,7 +6,7 @@ print(master) library(rzmq) context = init.context() socket = init.socket(context, "ZMQ_REQ") connect.socket(socket, master) send.socket(socket, data=list(id=0)) msg = receive.socket(socket) -
mschubert revised this gist
Aug 15, 2015 . No changes.There are no files selected for viewing
-
mschubert revised this gist
Aug 15, 2015 . No changes.There are no files selected for viewing
-
mschubert created this gist
Aug 15, 2015 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,10 @@ #BSUB-J {{ job_name }} # name of the job / array jobs #BSUB-o {{ log_file }} # output is sent to logfile, stdout + stderr by default #BSUB-P {{ queue }} # Job queue #BSUB-W {{ walltime }} # Walltime in minutes #BSUB-M {{ memory }} # Memory requirements in Mbytes #BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes #BSUB-R select[panfs_nobackup_research] R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}" This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,45 @@ # use the template & submit library(infuser) values = list( job_name = "jn", log_file = "~/bsub.log", queue = "research-rh6", walltime = 10080, memory = 1024, rscript = "/nfs/research2/saezrodriguez/mike/rzmq/worker.r", args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124) ) filled = infuser::infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values) system("bsub", input=filled) # bind status socket library(rzmq) zmq.context = init.context() socket = init.socket(zmq.context, "ZMQ_REP") bind.socket(socket, "tcp://*:6124") fun = function(x, y) x*x+y job_data = 1:10 job_const = list(y=100) job_status = rep("queued", length(job_data)) job_result = list() while(any(job_status %in% c("queued", "running"))) { msg = receive.socket(socket) print(msg) if (msg$id == 0) send.socket(socket, data=list(fun=fun, const=job_const), send.more=TRUE) else { job_status[msg$id] = "done" job_result[[msg$id]] = msg$result } id = which(job_status == "queued")[1] if (!is.na(id)) { send.socket(socket, data=list(id=id, iter=job_data[id])) job_status[id] = "running" } else send.socket(socket, data=list(id=0)) Sys.sleep(0.1) } This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,30 @@ # this should be submitted by bsub # and get the server as argument master = commandArgs(TRUE)[1] #master = "tcp://ebi-002.ebi.ac.uk:6124" print(master) library(rzmq) context = init.context() socket = init.socket(context,"ZMQ_REQ") connect.socket(socket, master) send.socket(socket, data=list(id=0)) msg = receive.socket(socket) print(msg) fun = msg$fun const = msg$const while(TRUE) { msg = receive.socket(socket) print(msg) if (msg$id == 0) break result = do.call(fun, c(const, msg$iter)) print(result) send.socket(socket, data=list(id = msg$id, result=result)) Sys.sleep(1) }