--- title: "mirai - Minimalist Async Evaluation Framework for R" vignette: > %\VignetteIndexEntry{mirai - Minimalist Async Evaluation Framework for R} %\VignetteEngine{litedown::vignette} %\VignetteEncoding{UTF-8} --- ### Table of Contents 1. [Example 1: Compute-intensive Operations](#example-1-compute-intensive-operations) 2. [Example 2: I/O-bound Operations](#example-2-io-bound-operations) 3. [Example 3: Resilient Pipelines](#example-3-resilient-pipelines) 4. [Daemons: Local Persistent Processes](#daemons-local-persistent-processes) 5. [Distributed Computing: Remote Daemons](#distributed-computing-remote-daemons) 6. [Distributed Computing: Launching Daemons](#distributed-computing-launching-daemons) 7. [Distributed Computing: TLS Secure Connections](#distributed-computing-tls-secure-connections) 8. [Compute Profiles](#compute-profiles) 9. [Errors, Interrupts and Timeouts](#errors-interrupts-and-timeouts) 10. [Serialization - Arrow, polars and beyond](#serialization-arrow-polars-and-beyond) 11. [Asynchronous Parallel Map](#asynchronous-parallel-map) 12. [Using mirai in a Package](#using-mirai-in-a-package) ### Example 1: Compute-intensive Operations Use case: minimise execution times by performing long-running tasks concurrently in separate processes. Multiple long computes (model fits etc.) can be performed in parallel on available computing cores. Use `mirai()` to evaluate an expression asynchronously in a separate, clean R process. The following mimics an expensive calculation that eventually returns a random value. ``` r library(mirai) x <- list(time = 2L, mean = 4) m <- mirai({Sys.sleep(time); rnorm(5L, mean)}, time = x$time, mean = x$mean) ``` The mirai expression is evaluated in another process and hence must be self-contained, not referring to variables that do not already exist there. Above, the variables `time` and `mean` are passed as part of the `mirai()` call. A 'mirai' object is returned immediately - creating a mirai never blocks the session. Whilst the async operation is ongoing, attempting to access a mirai's data yields an 'unresolved' logical NA. ``` r m #> < mirai [] > m$data #> 'unresolved' logi NA ``` To check whether a mirai remains unresolved (yet to complete): ``` r unresolved(m) #> [1] TRUE ``` To wait for and collect the return value, use the mirai's `[]` method: ``` r m[] #> [1] 3.495676 3.816495 4.628665 3.491260 5.778629 ``` As a mirai represents an async operation, it is never necessary to wait for it. Other code can continue to be run. Once it completes, the return value automatically becomes available at `$data`. ``` r m #> < mirai [$data] > m$data #> [1] 3.495676 3.816495 4.628665 3.491260 5.778629 ``` For easy programmatic use of `mirai()`, '.expr' accepts a pre-constructed language object, and also a list of named arguments passed via '.args'. So, the following would be equivalent to the above: ``` r expr <- quote({Sys.sleep(time); rnorm(5L, mean)}) args <- list(time = x$time, mean = x$mean) m <- mirai(.expr = expr, .args = args) m[] #> [1] 3.425921 4.885346 4.717019 2.501762 5.443241 ``` [« Back to ToC](#table-of-contents) ### Example 2: I/O-bound Operations Use case: ensure execution flow of the main process is not blocked. High-frequency real-time data cannot be written to file/database synchronously without disrupting the execution flow. Cache data in memory and use `mirai()` to perform periodic write operations concurrently in a separate process. Below, '.args' is used to pass `environment()`, which is the calling environment. This provides a convenient method of passing in existing objects. ``` r library(mirai) x <- rnorm(1e6) file <- tempfile() m <- mirai(write.csv(x, file = file), .args = environment()) ``` A 'mirai' object is returned immediately. `unresolved()` may be used in control flow statements to perform actions which depend on resolution of the 'mirai', both before and after. This means there is no need to actually wait (block) for a 'mirai' to resolve, as the example below demonstrates. ``` r # unresolved() queries for resolution itself so no need to use it again within the while loop while (unresolved(m)) { cat("while unresolved\n") Sys.sleep(0.5) } #> while unresolved #> while unresolved cat("Write complete:", is.null(m$data)) #> Write complete: TRUE ``` Now actions which depend on the resolution may be processed, for example the next write. [« Back to ToC](#table-of-contents) ### Example 3: Resilient Pipelines Use case: isolating code that can potentially fail in a separate process to ensure continued uptime. As part of a data science / machine learning pipeline, iterations of model training may periodically fail for stochastic and uncontrollable reasons (e.g. buggy memory management on graphics cards). Running each iteration in a 'mirai' isolates this potentially-problematic code such that even if it does fail, it does not bring down the entire pipeline. ``` r library(mirai) run_iteration <- function(i) { if (runif(1) < 0.1) stop("random error\n", call. = FALSE) # simulates a stochastic error rate sprintf("iteration %d successful\n", i) } for (i in 1:10) { m <- mirai(run_iteration(i), environment()) while (is_error_value(call_mirai(m)$data)) { cat(m$data) m <- mirai(run_iteration(i), environment()) } cat(m$data) } #> iteration 1 successful #> iteration 2 successful #> iteration 3 successful #> iteration 4 successful #> iteration 5 successful #> iteration 6 successful #> iteration 7 successful #> Error: random error #> iteration 8 successful #> iteration 9 successful #> iteration 10 successful ``` Further, by testing the return value of each 'mirai' for errors, error-handling code is then able to automate recovery and re-attempts, as in the above example. Further details on [error handling](#errors-interrupts-and-timeouts) can be found in the section below. The end result is a resilient and fault-tolerant pipeline that minimises downtime by eliminating interruptions of long computes. [« Back to ToC](#table-of-contents) ### Daemons: Local Persistent Processes Daemons, or persistent background processes, may be set to receive 'mirai' requests. This is potentially more efficient as new processes no longer need to be created on an *ad hoc* basis. Daemons inherit the default system configuration and read in the relevant '.Renviron' and '.Rprofile' etc. on startup. They also load the default packages. To instead only load the `base` package (which cuts out more than half of R's startup time), the environment variable `R_SCRIPT_DEFAULT_PACKAGES=NULL` may be set prior to launching daemons. #### With Dispatcher (default) Call `daemons()` specifying the number of daemons to launch. ``` r daemons(6) #> [1] 6 ``` To view the current status, `status()` provides the number of active connections along with a matrix of statistics for each daemon. ``` r status() #> $connections #> [1] 1 #> #> $daemons #> i online instance assigned complete #> abstract://bb7da8970444e783a3f63281 1 1 1 0 0 #> abstract://faf804d9f786b1ccc4f76628 2 1 1 0 0 #> abstract://e8e180caa09bc41f4067298a 3 1 1 0 0 #> abstract://3faa8cb0dadbe3c24fc35133 4 1 1 0 0 #> abstract://bdca3dd925f7416391b86f12 5 1 1 0 0 #> abstract://2e4db69b3b9c198610e51062 6 1 1 0 0 ``` The default `dispatcher = "process"` creates a `dispatcher()` background process that connects to individual daemon processes on the local machine. This ensures that tasks are dispatched efficiently on a first-in first-out (FIFO) basis to daemons for processing. Tasks are queued at the dispatcher and sent to a daemon as soon as it can accept the task for immediate execution. Dispatcher uses synchronisation primitives from [`nanonext`](https://doi.org/10.5281/zenodo.7903429), waiting upon rather than polling for tasks, which is efficient both in terms of consuming no resources while waiting, and also being fully synchronised with events (having no latency). Alternatively, specifying `dispatcher = "thread"` runs dispatcher logic on a thread, a faster and more efficient alternative to the separate background process. This is a new feature that should be considered experimental. ``` r daemons(0) #> [1] 0 ``` Set the number of daemons to zero to reset. This reverts to the default of creating a new background process for each 'mirai' request. #### Without Dispatcher Alternatively, specifying `dispatcher = 'none`, the background daemons connect directly to the host process. ``` r daemons(6, dispatcher = "none") #> [1] 6 ``` Requesting the status now shows 6 connections, along with the host URL at `$daemons`. ``` r status() #> $connections #> [1] 6 #> #> $daemons #> [1] "abstract://1ebf9fad5f9a05dfb8def571" ``` This implementation sends tasks immediately, and ensures that tasks are evenly-distributed amongst daemons. This means that optimal scheduling is not guaranteed as the duration of tasks cannot be known *a priori*. As an example, tasks could be queued at a daemon behind a long-running task, whilst other daemons are idle having already completed their tasks. The advantage of this approach is that it is low-level and does not require an additional dispatcher process. It is well-suited to working with similar-length tasks, or where the number of concurrent tasks typically does not exceed available daemons. #### Everywhere `everywhere()` may be used to evaluate an expression on all connected daemons and persist the resultant state, regardless of a daemon's 'cleanup' setting. ``` r everywhere(library(DBI)) ``` The above keeps the [`DBI`](https://dbi.r-dbi.org/) package loaded for all evaluations. Other types of setup task may also be performed, including making a common resource available, such as a database connection: ``` r file <- tempfile() everywhere(con <<- dbConnect(RSQLite::SQLite(), file), file = file) ``` By super-assignment, the conenction 'con' will be available in the global environment of all daemon instances. Subsequent mirai calls may then make use of 'con'. ``` r m <- mirai(capture.output(str(con))) m[] #> [1] "Formal class 'SQLiteConnection' [package \"RSQLite\"] with 8 slots" #> [2] " ..@ ptr : " #> [3] " ..@ dbname : chr \"/tmp/RtmpgPsUPA/file4e907e57a6b6\"" #> [4] " ..@ loadable.extensions: logi TRUE" #> [5] " ..@ flags : int 70" #> [6] " ..@ vfs : chr \"\"" #> [7] " ..@ ref : " #> [8] " ..@ bigint : chr \"integer64\"" #> [9] " ..@ extended_types : logi FALSE" ``` Disconnect from the database everywhere, and set the number of daemons to zero to reset. ``` r everywhere(dbDisconnect(con)) daemons(0) #> [1] 0 ``` #### With Method `daemons()` has a `with()` method, which evaluates an expression with daemons created for the duration of the expression and automatically torn down upon completion. It was designed for the use case of running a Shiny app with the desired number of daemons. ``` r with(daemons(4), shiny::runApp(app)) ``` Note: in the above case, it is assumed the app is already created. Wrapping a call to `shiny::shinyApp()` would not work as `runApp()` is implicitly called when the app is printed, however printing occurs only after `with()` has returned, hence the app would run outside of the scope of the `with()` statement. [« Back to ToC](#table-of-contents) ### Distributed Computing: Remote Daemons The daemons interface may also be used to send tasks for computation to remote daemon processes on the network. Call `daemons()` specifying 'url' as a character string such as: 'tcp://10.75.32.70:5555' at which daemon processes should connect to. Alternatively, use `host_url()` to automatically construct a valid URL. IPv6 addresses are also supported and must be enclosed in square brackets `[]` to avoid confusion with the final colon separating the port. For example, port 5555 on the IPv6 address `::ffff:a6f:50d` would be specified as `tcp://[::ffff:a6f:50d]:5555`. For options on actually launching the daemons, please see the next section. #### Connecting to Remote Daemons Through Dispatcher With the default `dispatcher = "process"` or alternatively `dispatcher = "thread"`, dispatcher listens to a vector of URLs that remote `daemon()` processes dial in to, with each daemon having its own unique URL. It is recommended to use a websocket URL starting `ws://` instead of TCP in this scenario (used interchangeably with `tcp://`). A websocket URL supports a path after the port number, which can be made unique for each daemon. In this way a dispatcher can connect to an arbitrary number of daemons over a single port. Supplying a vector of URLs allows the use of arbitrary port numbers / paths. 'n' does not need to be specified if it can be inferred from the length of the 'url' vector, for example: ``` r daemons(url = c("ws://10.75.32.70:5566/cpu", "ws://10.75.32.70:5566/gpu", "ws://10.75.32.70:7788/1")) ``` Alternatively, below a single URL is supplied, along with `n = 4` to specify that the dispatcher should listen at 4 URLs. In such a case, an integer sequence is automatically appended to the path `/1` through `/4` to produce the URLs. ``` r daemons(n = 4, url = host_url(port = 5555)) #> [1] 4 ``` Requesting status on the host machine: ``` r status() #> $connections #> [1] 1 #> #> $daemons #> i online instance assigned complete #> tcp://hostname:5555 1 0 0 0 0 #> tcp://hostname:5556 2 0 0 0 0 #> tcp://hostname:5557 3 0 0 0 0 #> tcp://hostname:5558 4 0 0 0 0 ``` As per the local case, `$connections` shows the single connection to dispatcher, however `$daemons` now provides a matrix of statistics for the remote daemons. - `i` index number. - `online` shows as 1 when there is an active connection, or else 0 if a daemon has yet to connect or has disconnected. - `instance` increments by 1 every time there is a new connection at a URL. This counter is designed to track new daemon instances connecting after previous ones have ended (due to time-outs etc.). The count becomes negative immediately after a URL is regenerated by `saisei()`, but increments again once a new daemon connects. - `assigned` shows the cumulative number of tasks assigned to the daemon. - `complete` shows the cumulative number of tasks completed by the daemon. Dispatcher automatically adjusts to the number of daemons actually connected. Hence it is possible to dynamically scale up or down the number of daemons according to requirements (limited to the 'n' URLs assigned). To reset all connections and revert to default behaviour: ``` r daemons(0) #> [1] 0 ``` Closing the connection causes the dispatcher to exit automatically, and in turn all connected daemons when their respective connections with the dispatcher are terminated. #### Connecting to Remote Daemons Directly By specifying `dispatcher = "none"`, remote daemons connect directly to the host process. The host listens at a single URL, and distributes tasks to all connected daemons. ``` r daemons(url = host_url(), dispatcher = "none") #> [1] 0 ``` Note that above, calling `host_url()` without a port value uses the default of '0'. This is a wildcard value that will automatically cause a free ephemeral port to be assigned. The actual assigned port may be queried at any time via `status()`: ``` r status() #> $connections #> [1] 0 #> #> $daemons #> [1] "tcp://hostname:42761" ``` The number of daemons connecting to the host URL is not limited and network resources may be added or removed at any time, with tasks automatically distributed to all connected daemons. From the status query above, `$connections` shows the actual number of connected daemons. To reset all connections and revert to default behaviour: ``` r daemons(0) #> [1] 0 ``` This causes all connected daemons to exit automatically. [« Back to ToC](#table-of-contents) ### Distributed Computing: Launching Daemons To launch remote daemons, supply a remote launch configuration to the 'remote' argument of `daemons()` when setting up daemons, or `launch_remote()` at any time afterwards. `ssh_config()` may be used to generate a remote launch configuration if there is SSH access to the remote machine, or else `remote_config()` provides a flexible method for generating a configuration involving a custom resource manager / application. #### SSH Direct Connection The first example below launches 4 daemons on the machine 10.75.32.90 (using the default SSH port of 22 as this was not specified), connecting back to the dispatcher URLs: ``` r daemons( n = 4, url = host_url(ws = TRUE, port = 5555), remote = ssh_config(remotes = "ssh://10.75.32.90") ) ``` The second example below launches one daemon on each of 10.75.32.90 and 10.75.32.91 using the custom SSH port of 222: ``` r daemons( n = 2, url = host_url(ws = TRUE, port = 5555), remote = ssh_config(c("ssh://10.75.32.90:222", "ssh://10.75.32.91:222")) ) ``` In the above examples, as the remote daemons connect back directly, port 5555 on the local machine must be open to incoming connections from the remote addresses. #### SSH Tunnelling Use of SSH tunnelling provides a convenient way to launch remote daemons without requiring the remote machine to be able to access the host. Often firewall configurations or security policies may prevent opening a port to accept outside connections. In these cases SSH tunnelling offers a solution by creating a tunnel once the initial SSH connection is made. For simplicity, this SSH tunnelling implementation uses the same port on both the side of the host and that of the corresponding node. SSH key-based authentication must also already be in place. Tunnelling requires the hostname for 'url' specified when setting up daemons to be either '127.0.0.1' or 'localhost'. This is as the tunnel is created between 127.0.0.1:port or equivalently localhost:port on each machine. The host listens to its localhost:port and the remotes each dial into localhost:port on their own respective machines. The below example launches 2 nodes on the remote machine 10.75.32.90 using SSH tunnelling over port 5555 ('url' hostname is specified as 'localhost'): ``` r daemons( url = "tcp://localhost:5555", remote = ssh_config( remotes = c("ssh://10.75.32.90", "ssh://10.75.32.90"), tunnel = TRUE ) ) ``` #### Cluster Resource Managers `remote_config()` may be used to run a command to deploy daemons using a resource manager. Taking Slurm as an example, the following uses `sbatch` to launch 2 daemons on the cluster, with some additional arguments to `sbatch` specifying the resource allocation: ``` r daemons( n = 2, url = host_url(ws = TRUE), remote = remote_config( command = "sbatch", args = c("--mem 512", "-n 1", "--wrap", "."), rscript = file.path(R.home("bin"), "Rscript"), quote = TRUE ) ) ``` #### Manual Deployment As an alternative to automated launches, calling `launch_remote()` without specifying 'remote' may be used to return the shell commands for deploying daemons manually. The printed return values may be copy / pasted directly to a remote machine. ``` r daemons(n = 2, url = host_url()) #> [1] 2 launch_remote(1:2) #> [1] #> Rscript -e 'mirai::daemon("tcp://hostname:34471",rs=c(10407,-1464045629,542467624,-1598368247,142924342,1806531391,-2073238412))' #> #> [2] #> Rscript -e 'mirai::daemon("tcp://hostname:45693",rs=c(10407,-1513314584,1587671761,1092626800,-185579079,11250700,-1550799561))' daemons(0) #> [1] 0 ``` Note that `daemons()` should be set up on the host machine before launching `daemon()` on remote resources, otherwise the daemon instances will exit if a connection is not immediately available. Alternatively, specifying the argument `autoexit = FALSE` will allow daemons to wait (indefinitely) for a connection to become available. [« Back to ToC](#table-of-contents) ### Distributed Computing: TLS Secure Connections TLS is available as an option to secure communications from the local machine to remote daemons. #### Zero-configuration An automatic zero-configuration default is implemented. Simply specify a secure URL of the form `wss://` or `tls+tcp://` when setting daemons, or use `host_url(tls = TRUE)`, for example: ``` r daemons(n = 4, url = host_url(ws = TRUE, tls = TRUE)) #> [1] 4 ``` Single-use keys and certificates are automatically generated and configured, without requiring any further intervention. The private key is always retained on the host machine and never transmitted. The generated self-signed certificate is available via `launch_remote()`. This function conveniently constructs the full shell command to launch a daemon, including the correctly specified 'tls' argument to `daemon()`. ``` r launch_remote(1) #> [1] #> Rscript -e 'mirai::daemon("wss://hostname:35957/1",tls=c("-----BEGIN CERTIFICATE----- #> MIIFNzCCAx+gAwIBAgIBATANBgkqhkiG9w0BAQsFADAzMREwDwYDVQQDDAhrdW1h #> bW90bzERMA8GA1UECgwITmFub25leHQxCzAJBgNVBAYTAkpQMB4XDTAxMDEwMTAw #> MDAwMFoXDTMwMTIzMTIzNTk1OVowMzERMA8GA1UEAwwIa3VtYW1vdG8xETAPBgNV #> BAoMCE5hbm9uZXh0MQswCQYDVQQGEwJKUDCCAiIwDQYJKoZIhvcNAQEBBQADggIP #> ADCCAgoCggIBAKgWqagLiM5FC5EuvEWA3drnGLa9E5oPUq9ITajT7AYtSE8gW9WZ #> tIWK631uyeJyxZ8k+D/REkV7dJJONDKZ4csyNgoVSoLlswW7OuqoOxT7wA6laa5b #> ld8fRKDQkRnhV/qE4YQvPvH1TXUBOqGfg8EdcusN3p17vB2j17RtlnlNSiTGavkn #> fz8xsNsowJjWLYz05ZMzkiL37cjccDgP0/gG97w4aRSZtaLJSWNs/EKjzKhIe2iI #> kMFafiG9FABJF5qhh5L0vWdvfncwcWo2wQhvDTXFhzWXPYhwZU0o8CDAfkESIuF7 #> VGNLeQsL9lnFbiJiOc4c1IPovxExl04bLvxQQ++iadgNQYwabXNkYJSNdIm/Txdq #> SFIhmEvfJf5sQva3md22mDB40zxEo/STaOeLxxDhlqjbMFzN3xLgxCb7AhyIG12e #> BGWzPuGZBxvzCFqYUd9x/GLmdsA99tGPrEBrNhASmb39X4MjG/cThFyhJfRrr1Ez #> Aifs5XZjamPXB3JR0kTIt42OuMmxytx3eifjZEalgUpNUlyHIl0PZoTtDSHX/p8X #> DkG/c0/3faFujkJbPitljqzZyoEAdVHbmEzcDAUkqtAZ9NSXEIs1qrMji8+z9+85 #> t0atsUTk8JpYNUbahg2m6/Nmr0xE/ZRVAzjBw2EMQssCA+5bXZBguxB9AgMBAAGj #> VjBUMBIGA1UdEwEB/wQIMAYBAf8CAQAwHQYDVR0OBBYEFIjQkLX0q5tObwAcmh45 #> +MaTpFpkMB8GA1UdIwQYMBaAFIjQkLX0q5tObwAcmh45+MaTpFpkMA0GCSqGSIb3 #> DQEBCwUAA4ICAQCdXAXqpdnhoPZ6mTjk2ecuqzoZ/+xSRj9D2BAGWsBVHYOUxw6v #> 5XnosewJop8Jd0GLAs1DjjVkxuBEF1tJiN6qEaoqVOTBoO8t3GL4iu6jb/LmC759 #> BsygcuPQZCTQW70/Fhjp6/Vnklrokfu5FW5oYPdq3EKk4y2CEON0Jkc18i91MH7R #> yFE/yALuDcNMi411UeB9mZt47mZZ12AyX/m/ftRjCONWwvofnDJq3kQFqF2oHwW/ #> 0KklUQauvpCk4phJuxSNrxFg35dAqhHccFSBFNcqK1HGPLK2lZVQqepa39ls7Zhb #> fbZqxN0jaJtQ2MzPgOUogejGS0+dWdUZUTdp8h63rthtCM7kQ9FzEf/TmQcbZwIO #> MVcGQrTPtoP+8VHuClyizHE+Ct7wdqTjMJEmR+dvNQMx1EAWr2kuYZ1XGHsAfDlM #> ZhIy04rDO8ghT7TImEoi8Z36prDvulETbN0t2RNNXqE4nc/fNEl/WNAx61PeZdZ7 #> 0uktJVR7ekSr6rQILCLYagEuCOhlc7Mvrxf0LIXQCuGmzDxIpaKuC3UbXNBwAGDl #> THfQzCmb9+tO9EO5nbs9C7CHNhnWtw2sgh+uRlqgg2ps10/QHvL9rQAgcThwmkAT #> gBVWwO072B2m5H+2BIyi/05hBfW0r0z3b49ay3QspwSLktY6X472jV0GPg== #> -----END CERTIFICATE----- #> ",""),rs=c(10407,-455422690,750700295,1273478044,1219766253,-2043149878,1421521091))' ``` The printed value may be deployed directly on a remote machine. [« Back to ToC](#table-of-contents) #### CA Signed Certificates As an alternative to the zero-configuration default, a certificate may also be generated via a Certificate Signing Request (CSR) to a Certificate Authority (CA), which may be a public CA or a CA internal to an organisation. 1. Generate a private key and CSR. The following resources describe how to do so: - using Mbed TLS: - using OpenSSL: (Chapter 1.2 Key and Certificate Management) 2. Provide the generated CSR to the CA for it to sign a new TLS certificate. - The common name (CN) of the certificate must be identical to the hostname or IP address actually used for the connection. As this is verified, it will fail if not the same. - The received certificate should comprise a block of cipher text between the markers `-----BEGIN CERTIFICATE-----` and `-----END CERTIFICATE-----`. Make sure to request the certificate in the PEM format. If only available in other formats, the TLS library used should usually provide conversion utilities. - Check also that the private key is a block of cipher text between the markers `-----BEGIN PRIVATE KEY-----` and `-----END PRIVATE KEY-----`. 3. When setting daemons, the TLS certificate and private key should be provided to the 'tls' argument of `daemons()`. - If the certificate and private key have been imported as character strings `cert` and `key` respectively, then the 'tls' argument may be specified as the character vector `c(cert, key)`. - Alternatively, the certificate may be copied to a new text file, with the private key appended, in which case the path/filename of this file may be provided to the 'tls' argument. 4. When launching daemons, the certificate chain to the CA should be supplied to the 'tls' argument of `daemon()` or `launch_remote()`. - The certificate chain should comprise multiple certificates, each between `-----BEGIN CERTIFICATE-----` and `-----END CERTIFICATE-----` markers. The first one should be the newly-generated TLS certificate, the same supplied to `daemons()`, and the final one should be a CA root certificate. - These are the only certificates required if the certificate was signed directly by a CA. If not, then the intermediate certificates should be included in a certificate chain that starts with the TLS certificate and ends with the certificate of the CA. - If these are concatenated together as a single character string `certchain`, then the character vector comprising this and an empty character string `c(certchain, "")` may be supplied to the relevant 'tls' argument. - Alternatively, if these are written to a file (and the file replicated on the remote machines), then the 'tls' argument may also be specified as a path/filename (assuming these are the same on each machine). [« Back to ToC](#table-of-contents) ### Compute Profiles The `daemons()` interface also allows the specification of compute profiles for managing tasks with heterogeneous compute requirements: - send tasks to different daemons or clusters of daemons with the appropriate specifications (in terms of CPUs / memory / GPU / accelerators etc.) - split tasks between local and remote computation Simply specify the argument `.compute` when calling `daemons()` with a profile name (which is 'default' for the default profile). The daemons settings are saved under the named profile. To create a 'mirai' task using a specific compute profile, specify the '.compute' argument to `mirai()`, which defaults to the 'default' compute profile. Similarly, functions such as `status()`, `launch_local()` or `launch_remote()` should be specified with the desired '.compute' argument. [« Back to ToC](#table-of-contents) ### Errors, Interrupts and Timeouts If execution in a mirai fails, the error message is returned as a character string of class 'miraiError' and 'errorValue' to facilitate debugging. `is_mirai_error()` may be used to test for mirai execution errors. ``` r m1 <- mirai(stop("occurred with a custom message", call. = FALSE)) m1[] #> 'miraiError' chr Error: occurred with a custom message m2 <- mirai(mirai::mirai()) m2[] #> 'miraiError' chr Error in mirai::mirai(): missing expression, perhaps wrap in {}? is_mirai_error(m2$data) #> [1] TRUE is_error_value(m2$data) #> [1] TRUE ``` A full stack trace of evaluation within the mirai is recorded and accessible at `$stack.trace` on the error object. ``` r f <- function(x) if (x > 0) stop("positive") m3 <- mirai({f(-1); f(1)}, f = f) m3[] #> 'miraiError' chr Error in f(1): positive m3$data$stack.trace #> [[1]] #> [1] "stop(\"positive\")" #> #> [[2]] #> [1] "f(1)" ``` If a daemon instance is sent a user interrupt, the mirai will resolve to an object of class 'miraiInterrupt' and 'errorValue'. `is_mirai_interrupt()` may be used to test for such interrupts. ``` r m4 <- mirai(rlang::interrupt()) # simulates a user interrupt is_mirai_interrupt(m4[]) #> [1] TRUE ``` If execution of a mirai surpasses the timeout set via the '.timeout' argument, the mirai will resolve to an 'errorValue' of 5L (timed out). This can, amongst other things, guard against mirai processes that have the potential to hang and never return. ``` r m4 <- mirai(nanonext::msleep(1000), .timeout = 500) m4[] #> 'errorValue' int 5 | Timed out is_mirai_error(m4$data) #> [1] FALSE is_mirai_interrupt(m4$data) #> [1] FALSE is_error_value(m4$data) #> [1] TRUE ``` `is_error_value()` tests for all mirai execution errors, user interrupts and timeouts. [« Back to ToC](#table-of-contents) ### Serialization: Arrow, polars and beyond Native R serialization is used for sending data between host and daemons. Some R objects by their nature cannot be serialized, such as those accessed via an external pointer. In these cases, performing 'mirai' operations on them would normally error. Using the [`arrow`](https://arrow.apache.org/docs/r/) package as an example: ``` r library(arrow, warn.conflicts = FALSE) daemons(2) #> [1] 2 everywhere(library(arrow)) x <- as_arrow_table(iris) m <- mirai(list(a = head(x), b = "some text"), x = x) m[] #> 'miraiError' chr Error: Invalid , external pointer to null ``` However, `serial_config()` can be used to create custom serialization configurations, specifying functions that hook into R's native serialization mechanism for reference objects ('refhooks'). This configuration can then be specified as part of an `everywhere()` call via its '.serial' argument. ``` r cfg <- serial_config( class = "ArrowTabular", sfunc = arrow::write_to_raw, ufunc = function(x) arrow::read_ipc_stream(x, as_data_frame = FALSE) ) daemons(2) #> [1] 2 everywhere(library(arrow), .serial = cfg) m <- mirai(list(a = head(x), b = "some text"), x = x) m[] #> $a #> Table #> 6 rows x 5 columns #> $Sepal.Length #> $Sepal.Width #> $Petal.Length #> $Petal.Width #> $Species > #> #> See $metadata for additional Schema metadata #> #> $b #> [1] "some text" ``` It can be seen that this time, the arrow table is seamlessly handled in the 'mirai' process. This is the case even when the object is deeply nested inside lists or other structures. To change registered serialization functions, just call `everywhere()` again supplying the new functions. As an example, we can switch to using [`polars`](https://pola-rs.github.io/r-polars/), a 'lightning fast' dataframe library written in Rust (requires `polars` >= 0.16.4). ``` r everywhere( {}, .serial = serial_config( class = "RPolarsDataFrame", sfunc = function(x) polars::as_polars_df(x)$to_raw_ipc(), ufunc = polars::pl$read_ipc ) ) x <- polars::as_polars_df(iris) m <- mirai(list(a = head(x), b = "some text"), x = x) m[] #> $a #> shape: (6, 5) #> ┌──────────────┬─────────────┬──────────────┬─────────────┬─────────┐ #> │ Sepal.Length ┆ Sepal.Width ┆ Petal.Length ┆ Petal.Width ┆ Species │ #> │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ #> │ f64 ┆ f64 ┆ f64 ┆ f64 ┆ cat │ #> ╞══════════════╪═════════════╪══════════════╪═════════════╪═════════╡ #> │ 5.1 ┆ 3.5 ┆ 1.4 ┆ 0.2 ┆ setosa │ #> │ 4.9 ┆ 3.0 ┆ 1.4 ┆ 0.2 ┆ setosa │ #> │ 4.7 ┆ 3.2 ┆ 1.3 ┆ 0.2 ┆ setosa │ #> │ 4.6 ┆ 3.1 ┆ 1.5 ┆ 0.2 ┆ setosa │ #> │ 5.0 ┆ 3.6 ┆ 1.4 ┆ 0.2 ┆ setosa │ #> │ 5.4 ┆ 3.9 ┆ 1.7 ┆ 0.4 ┆ setosa │ #> └──────────────┴─────────────┴──────────────┴─────────────┴─────────┘ #> #> $b #> [1] "some text" ``` To cancel serialization functions entirely, supply an empty list to the '.serial' argument of `everywhere()`: ``` r everywhere({}, .serial = list()) daemons(0) #> [1] 0 ``` The 'vec' argument to `serialization()` may be specified as `TRUE` if the serialization functions are vectorized and take lists of objects, as is the case for [`safetensors`](https://mlverse.github.io/safetensors/), used for serialization in [`torch`](https://torch.mlverse.org/). Please refer to the [torch vignette](https://shikokuchuo.net/mirai/articles/torch.html) for further examples. [« Back to ToC](#table-of-contents) ### Asynchronous Parallel Map `mirai_map()` performs asynchronous parallel/distributed map using `mirai`. This function is similar to `purrr::map()`, but returns a 'mirai_map' object. It is also more advanced as it allows multiple map over the rows of a dataframe or matrix. The results of a mirai_map `x` may be collected using `x[]`. This waits for all asynchronous operations to complete if still in progress. #### Key advantages: 1. Returns immediately with all evaluations taking place asynchronously. Printing a 'mirai map' object shows the current completion progress. 1. The '.promise' argument allows a promise to registered against each mirai, which can be used to perform side-effects. 1. Returns evaluation errors as 'miraiError' or 'errorValue' as the case may be, rather than causing the entire operation to fail. This allows more efficient recovery from partial failure. 1. Does not rely on a 'chunking' algorithm that attempts to split work into batches according to the number of available daemons, as implemented for example in the `parallel` package. Chunking cannot take into account varying or unpredictable compute times over the indices. It can be optimal to rely on `mirai` for scheduling instead. This is demonstrated in the example below. ``` r library(mirai) library(parallel) cl <- make_cluster(4) daemons(4) #> [1] 4 vec <- c(1, 1, 4, 4, 1, 1, 1, 1) system.time(mirai_map(vec, Sys.sleep)[]) #> user system elapsed #> 0.004 0.002 4.006 system.time(parLapply(cl, vec, Sys.sleep)) #> user system elapsed #> 0.005 0.007 8.010 ``` `.args` is used to specify further constant arguments to `.f` - the 'mean' and 'sd' in the example below: ``` r with( daemons(3, dispatcher = "none"), mirai_map(1:3, rnorm, .args = list(mean = 20, sd = 2))[] ) #> [[1]] #> [1] 21.88944 #> #> [[2]] #> [1] 17.87407 20.08740 #> #> [[3]] #> [1] 21.64744 18.48347 16.43075 ``` Use `...` to further specify objects referenced but not defined in `.f` - the 'do' in the anonymous function below: ``` r ml <- mirai_map( c(a = 1, b = 2, c = 3), function(x) do(x, as.logical(x %% 2)), do = nanonext::random ) #> Warning: mirai is launching one local daemon for a map operation as none previously set ml #> < mirai map [1/3] > ml[] #> $a #> [1] "88" #> #> $b #> [1] 39 3b #> #> $c #> [1] "1b5f6c" ``` Use of `mirai_map()` assumes that `daemons()` have previously been set. If not then one (non-dispatcher) daemon is set to allow the function to proceed. This ensures safe behaviour, but is unlikely to be optimal, so please ensure daemons are set beforehand. #### Collecting Results When collecting the results, optionally specify arguments to `[]`: - `x[.flat]` collects and flattens the results, checking that they are of the same type to avoid coercion. - `x[.progress]` collects results whilst showing a simple text progress indicator of parts completed of the total. - `x[.progress_cli]` is an alternative that uses `cli` progress bars, where available, showing completion percentage and ETA. - `x[.stop]` collects the results applying early stopping, which stops at the first failure and cancels remaining computations (note: computations already in progress continue to completion, but their results are not collected). Combinations of the above may be supplied in the fashion of `x[.stop, .progress]`. ``` r tryCatch( mirai_map(list(a = 1, b = "a", c = 3), sum)[.stop], error = identity ) #> with( daemons(4, dispatcher = "none"), mirai_map(c(0.1, 0.2, 0.3), Sys.sleep)[.progress, .flat] ) #> NULL ``` #### Multiple Map Multiple map is performed over the **rows** of a dataframe or matrix, as this is most often the desired behaviour. This allows map over 2 or more arguments by specifying a dataframe. One of those may be an index value for indexed map. ``` r fruit <- c("melon", "grapes", "coconut") # create a dataframe for indexed map: df <- data.frame(i = seq_along(fruit), fruit = fruit) with( daemons(3, dispatcher = "none"), mirai_map(df, sprintf, .args = list(fmt = "%d. %s"))[.flat] ) #> [1] "1. melon" "2. grapes" "3. coconut" ``` As a dataframe often contains columns of differing type, it is unusual to want to map over the **columns**, however this is possible by simply transforming it beforehand into a list using `as.list()`. Similarly, the behaviour of `lapply()` or `purrr::map()` on a matrix is the same as that for a vector. `mirai_map()` on the other hand does take into account the fact that the matrix has dimensions, and maps over its **rows**, consistent with the behaviour for dataframes. If instead, mapping over the columns is desired, simply take the transpose of the matrix beforehand using `t()`. [« Back to ToC](#table-of-contents) ### Using mirai in a Package mirai as a framework is designed to support completely transparent and inter-operable use within packages. A core design precept of not relying on global options or environment variables minimises the likelihood of conflict between use by different packages. There are hence few requirements of package authors. The following points may, however, be useful: - `daemons()` settings should usually be left to end-users. Users may be guided to mirai documentation if desired. If however, a package wishes to set default settings, for example, `daemons()` should always be called specifying `force = FALSE`. This ensures that any prior user settings are respected, and that daemons set elsewhere are not prematurely terminated. - Calling package functions in a mirai requires namespacing the call, or alternatively exporting the function, i.e. ```r mirai(mypkg::my_func()) ``` or ```r mirai(my_func(), .args = list(myfunc = myfunc)) ``` - The shape and contents of the `status()` daemons matrix must not be relied upon, as this user interface is subject to change at any time. There is a developer interface `nextget()`, for querying values such as 'urls' described in the function documentation. Note: only the specifically-documented values are supported interfaces. - This is recommended practice in any case, but especially relevant for package developers: the functions `unresolved()`, `is_error_value()`, `is_mirai_error()`, and `is_mirai_interrupt()` should be used to test for the relevant state of a mirai or its value. The characteristics of how they are currently implemented, e.g. as a logical NA for an 'unresolvedValue', should not be relied upon, as these are subject to change. - Testing on CRAN should respect it's 2-core usage limit. This practically means limiting tests to using one daemon (with `dispatcher = "none"`) to ensure that only one additional process is used. Always reset daemons when done and then allow at least a one-second sleep to ensure all background processes have properly exited. These limits apply only to tests run on CRAN machines and not those run elsewhere. [« Back to ToC](#table-of-contents)