Skip to content

Instantly share code, notes, and snippets.

@mschubert
Last active August 29, 2015 14:27

Revisions

  1. mschubert revised this gist Aug 20, 2015. 2 changed files with 4 additions and 6 deletions.
    6 changes: 3 additions & 3 deletions master.r
    Original file line number Diff line number Diff line change
    @@ -16,7 +16,7 @@ socket = init.socket(zmq.context, "ZMQ_REP")
    bind.socket(socket, "tcp://*:6124")

    # do the submissions
    njobs = 2
    njobs = 10
    for (j in 1:njobs) {
    values$job_name = paste0("rzmq-", j)
    values$log_file = paste0(values$job_name, ".log")
    @@ -25,7 +25,7 @@ for (j in 1:njobs) {

    # define the function to run on the workers and input data
    fun = function(x, y) x*x+y
    job_data = 1:10
    job_data = 1:1e4
    job_const = list(y=100)
    job_status = factor(rep("queued", length(job_data)), levels=c("queued", "running", "done"))
    job_result = list()
    @@ -48,5 +48,5 @@ while(any(job_status %in% c("queued", "running"))) {
    } else
    send.socket(socket, data=list(id=0))

    Sys.sleep(0.1)
    Sys.sleep(0.001)
    }
    4 changes: 1 addition & 3 deletions worker.r
    Original file line number Diff line number Diff line change
    @@ -20,11 +20,9 @@ while(TRUE) {
    if (msg$id == 0)
    break

    result = do.call(fun, c(const, msg$iter))
    result = try(do.call(fun, c(const, msg$iter)))

    print(result)

    send.socket(socket, data=list(id = msg$id, result=result))

    Sys.sleep(1)
    }
  2. mschubert revised this gist Aug 16, 2015. 1 changed file with 14 additions and 6 deletions.
    20 changes: 14 additions & 6 deletions master.r
    Original file line number Diff line number Diff line change
    @@ -1,28 +1,36 @@
    # use the template & submit
    library(rzmq)
    library(infuser)

    # use the template & submit
    values = list(
    job_name = "jn",
    log_file = "bsub.log",
    queue = "research-rh6",
    walltime = 10080,
    memory = 1024,
    rscript = "worker.r",
    args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124)
    )
    system("bsub", input=infuse("LSF.tmpl", values))

    # bind status socket
    library(rzmq)
    zmq.context = init.context()
    socket = init.socket(zmq.context, "ZMQ_REP")
    bind.socket(socket, "tcp://*:6124")

    # do the submissions
    njobs = 2
    for (j in 1:njobs) {
    values$job_name = paste0("rzmq-", j)
    values$log_file = paste0(values$job_name, ".log")
    system("bsub", input=infuse("LSF.tmpl", values))
    }

    # define the function to run on the workers and input data
    fun = function(x, y) x*x+y
    job_data = 1:10
    job_const = list(y=100)
    job_status = rep("queued", length(job_data))
    job_status = factor(rep("queued", length(job_data)), levels=c("queued", "running", "done"))
    job_result = list()

    # send and receive messages
    while(any(job_status %in% c("queued", "running"))) {
    msg = receive.socket(socket)
    print(msg)
  3. mschubert revised this gist Aug 15, 2015. 1 changed file with 3 additions and 4 deletions.
    7 changes: 3 additions & 4 deletions master.r
    Original file line number Diff line number Diff line change
    @@ -2,15 +2,14 @@
    library(infuser)
    values = list(
    job_name = "jn",
    log_file = "~/bsub.log",
    log_file = "bsub.log",
    queue = "research-rh6",
    walltime = 10080,
    memory = 1024,
    rscript = "/nfs/research2/saezrodriguez/mike/rzmq/worker.r",
    rscript = "worker.r",
    args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124)
    )
    filled = infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values)
    system("bsub", input=filled)
    system("bsub", input=infuse("LSF.tmpl", values))

    # bind status socket
    library(rzmq)
  4. mschubert revised this gist Aug 15, 2015. 3 changed files with 8 additions and 8 deletions.
    12 changes: 6 additions & 6 deletions LSF.tmpl
    Original file line number Diff line number Diff line change
    @@ -1,9 +1,9 @@
    #BSUB-J {{ job_name }} # name of the job / array jobs
    #BSUB-o {{ log_file }} # output is sent to logfile, stdout + stderr by default
    #BSUB-P {{ queue }} # Job queue
    #BSUB-W {{ walltime }} # Walltime in minutes
    #BSUB-M {{ memory }} # Memory requirements in Mbytes
    #BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
    #BSUB-J {{ job_name }} # name of the job / array jobs
    #BSUB-o {{ log_file | /dev/null }} # output is sent to logfile, stdout + stderr by default
    #BSUB-P {{ queue }} # Job queue
    #BSUB-W {{ walltime }} # Walltime in minutes
    #BSUB-M {{ memory | 4096 }} # Memory requirements in Mbytes
    #BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
    #BSUB-R select[panfs_nobackup_research]

    R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}"
    2 changes: 1 addition & 1 deletion master.r
    Original file line number Diff line number Diff line change
    @@ -9,7 +9,7 @@ values = list(
    rscript = "/nfs/research2/saezrodriguez/mike/rzmq/worker.r",
    args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124)
    )
    filled = infuser::infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values)
    filled = infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values)
    system("bsub", input=filled)

    # bind status socket
    2 changes: 1 addition & 1 deletion worker.r
    Original file line number Diff line number Diff line change
    @@ -6,7 +6,7 @@ print(master)

    library(rzmq)
    context = init.context()
    socket = init.socket(context,"ZMQ_REQ")
    socket = init.socket(context, "ZMQ_REQ")
    connect.socket(socket, master)
    send.socket(socket, data=list(id=0))
    msg = receive.socket(socket)
  5. mschubert revised this gist Aug 15, 2015. No changes.
  6. mschubert revised this gist Aug 15, 2015. No changes.
  7. mschubert created this gist Aug 15, 2015.
    10 changes: 10 additions & 0 deletions LSF.tmpl
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,10 @@
    #BSUB-J {{ job_name }} # name of the job / array jobs
    #BSUB-o {{ log_file }} # output is sent to logfile, stdout + stderr by default
    #BSUB-P {{ queue }} # Job queue
    #BSUB-W {{ walltime }} # Walltime in minutes
    #BSUB-M {{ memory }} # Memory requirements in Mbytes
    #BSUB-R rusage[mem={{ memory | 4096 }}] # Memory requirements in Mbytes
    #BSUB-R select[panfs_nobackup_research]

    R --no-save --no-restore --args "{{ args }}" < "{{ rscript }}"

    45 changes: 45 additions & 0 deletions master.r
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,45 @@
    # use the template & submit
    library(infuser)
    values = list(
    job_name = "jn",
    log_file = "~/bsub.log",
    queue = "research-rh6",
    walltime = 10080,
    memory = 1024,
    rscript = "/nfs/research2/saezrodriguez/mike/rzmq/worker.r",
    args = sprintf("tcp://%s:%i", Sys.info()[['nodename']], 6124)
    )
    filled = infuser::infuse("/nfs/research2/saezrodriguez/mike/rzmq/LSF.tmpl", values)
    system("bsub", input=filled)

    # bind status socket
    library(rzmq)
    zmq.context = init.context()
    socket = init.socket(zmq.context, "ZMQ_REP")
    bind.socket(socket, "tcp://*:6124")

    fun = function(x, y) x*x+y
    job_data = 1:10
    job_const = list(y=100)
    job_status = rep("queued", length(job_data))
    job_result = list()

    while(any(job_status %in% c("queued", "running"))) {
    msg = receive.socket(socket)
    print(msg)
    if (msg$id == 0)
    send.socket(socket, data=list(fun=fun, const=job_const), send.more=TRUE)
    else {
    job_status[msg$id] = "done"
    job_result[[msg$id]] = msg$result
    }

    id = which(job_status == "queued")[1]
    if (!is.na(id)) {
    send.socket(socket, data=list(id=id, iter=job_data[id]))
    job_status[id] = "running"
    } else
    send.socket(socket, data=list(id=0))

    Sys.sleep(0.1)
    }
    30 changes: 30 additions & 0 deletions worker.r
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,30 @@
    # this should be submitted by bsub
    # and get the server as argument
    master = commandArgs(TRUE)[1]
    #master = "tcp://ebi-002.ebi.ac.uk:6124"
    print(master)

    library(rzmq)
    context = init.context()
    socket = init.socket(context,"ZMQ_REQ")
    connect.socket(socket, master)
    send.socket(socket, data=list(id=0))
    msg = receive.socket(socket)
    print(msg)
    fun = msg$fun
    const = msg$const

    while(TRUE) {
    msg = receive.socket(socket)
    print(msg)
    if (msg$id == 0)
    break

    result = do.call(fun, c(const, msg$iter))

    print(result)

    send.socket(socket, data=list(id = msg$id, result=result))

    Sys.sleep(1)
    }