--- title: "Technical Documentation" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Technical Documentation} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{css echo=FALSE} img { border: 0px !important; margin: 2em 2em 2em 2em !important; } code { border: 0px !important; } ``` ```{r echo=FALSE, results="hide"} knitr::opts_chunk$set( cache = FALSE, echo = TRUE, collapse = TRUE, comment = "#>" ) options(clustermq.scheduler = "local") suppressPackageStartupMessages(library(clustermq)) ``` ## Worker API ### Base API and schedulers The main worker functions are wrapped in an _R6_ class with the name of `QSys`. This provides a standardized API to the [lower-level messages](https://mschubert.github.io/clustermq/articles/technicaldocs.html#zeromq-message-specification) that are sent via [_ZeroMQ_](https://zeromq.org/). The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs: ``` + QSys |- Multicore |- LSF + SGE |- PBS |- Torque |- etc. ``` The user-visible object is a worker `Pool` that wraps this, and will eventually allow to manage different workers. ### Workers #### Creating a worker pool A pool of workers can be created using the `workers()` function, which instantiates a `Pool` object of the corresponding `QSys`-derived scheduler class. See `?workers` for details. ```{r eval=FALSE} # start up a pool of three workers using the default scheduler w = workers(n_jobs=3) # if we make an unclean exit for whatever reason, clean up the jobs on.exit(w$cleanup()) ``` #### Worker startup For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work. This is achieved by the call to R common to all schedulers: ```{sh eval=FALSE} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")' ``` #### Worker communication On the master's side, we wait until a worker connects: ```{r eval=FALSE} msg = w$recv() # this will block until a worker is ready ``` We can then send any expression to be evaluated on the worker using the `send` method: ```{r eval=FALSE} w$send(expression, ...) ``` After the expression (in `...`), any variables that should be passed along with the call can be added. For batch processing that `clustermq` usually does, this command is `work_chunk`, where the `chunk` data is added: ```{r eval=FALSE} w$send(clustermq:::work_chunk(chunk, fun, const, rettype, common_seed), chunk = chunk(iter, submit_index)) ``` #### Worker environment We can add any number of objects to a worker environment using the `env` method: ```{r eval=FALSE} w$env(object=value, ...) ``` This will also invisibly return a `data.frame` with all objects currently in the environment. If a user wants to inspect the environment without changing it they can call `w$env()` without arguments. The environment will be propagated to all workers automatically in a greedy fashion. ### Main event loop Putting the above together in an event loop, we get what is essentially implemented in `master`. `w$send` invisibly returns an identifier to track which call was submitted, and `w$current()` matches the same to `w$recv()`. ```{r eval=FALSE} w = workers(3) on.exit(w$cleanup()) w$env(...) while (we have new work to send || jobs pending) { res = w$recv() # the result of the call, or NULL for a new worker w$current()$call_ref # matches answer to request, -1 otherwise # handle result if (more work) call_ref = w$send(expression, ...) # call_ref tracks request identity else w$send_shutdown() } ``` A loop of a similar structure can be used to extend `clustermq`. As an example, [this was done by the _targets_ package](https://github.com/ropensci/targets/blob/1.2.2/R/class_clustermq.R). ## ZeroMQ message specification Communication between the `master` (main event loop) and workers (`QSys` base class) is organised in _messages_. These are chunks of serialized data sent via _ZeroMQ_'s protocol (_ZMTP_). The parts of each message are called *frames*. ### Master - Worker communication The master requests an evaluation in a message with X frames (direct) or Y if proxied. This is all handled by _clustermq_ internally. * The worker identity frame or routing identifier * A delimiter frame * Worker status (`wlife_t`) * The call to be evaluated * _N_ repetitions of: * The variable name of an environment object that is not yet present on the worker * The variable value If using a proxy, this will be followed by a `SEXP` that contains variable names the proxy should add before forwarding to the worker. ### Worker evaluation A worker evaluates the call using the R C API: ```{r eval=FALSE} R_tryEvalSilent(cmd, env, &err); ``` If an error occurs in this evaluation will be returned as a structure with class `worker_error`. If a developer wants to catch errors and warnings in a more fine-grained manner, it is recommended to add their own `callingHandlers` to `cmd` (as _clustermq_ does work its `work_chunk`). ### Worker - Master communication The result of this evaluation is then returned in a message with four (direct) or five (proxied) frames: * Worker identity frame (handled internally by _ZeroMQ_'s `ZMQ_REQ` socket) * Empty frame (handled internally by _ZeroMQ_'s `ZMQ_REQ` socket) * Worker status (`wlife_t`) that is handled internally by _clustermq_ * The result of the call (`SEXP`), visible to the user If using a worker via SSH, these frames will be preceded by a routing identify frame that is handled internally by _ZeroMQ_ and added or peeled off by the proxy.