diff --git a/DESCRIPTION b/DESCRIPTION index 393014bd..29a41abd 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -21,12 +21,14 @@ Imports: magrittr (>= 1.5.0), rlang (>= 1.1.1), vctrs (>= 0.6.3) -Suggests: +Suggests: + carrier (>= 0.1.1), covr, dplyr (>= 0.7.8), httr, knitr, lubridate, + mirai (>= 2.0.1.9005), rmarkdown, testthat (>= 3.0.0), tibble, @@ -43,3 +45,4 @@ Config/testthat/parallel: TRUE Encoding: UTF-8 Roxygen: list(markdown = TRUE) RoxygenNote: 7.3.2 +Remotes: shikokuchuo/mirai diff --git a/NEWS.md b/NEWS.md index a224caa1..1b47cda6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,10 @@ # purrr (development version) +* purrr gains the capacity for parallel and distributed map, powered by the + mirai package. The argument `.parallel` has been added to `map()`, `map2()`, + `pmap()` and variants to enable this. See `?parallelization` for more details + (@shikokuchuo, #1163). + # purrr 1.0.4 # purrr 1.0.3 diff --git a/R/map-if-at.R b/R/map-if-at.R index 8131fdee..e9eb53fb 100644 --- a/R/map-if-at.R +++ b/R/map-if-at.R @@ -57,11 +57,11 @@ map_if <- function(.x, .p, .f, ..., .else = NULL) { #' installed, you can use `vars()` and tidyselect helpers to select #' elements. #' @export -map_at <- function(.x, .at, .f, ..., .progress = FALSE) { +map_at <- function(.x, .at, .f, ..., .parallel = FALSE, .progress = FALSE) { where <- where_at(.x, .at, user_env = caller_env()) out <- vector("list", length(.x)) - out[where] <- map(.x[where], .f, ..., .progress = .progress) + out[where] <- map(.x[where], .f, ..., .parallel = .parallel, .progress = .progress) out[!where] <- .x[!where] set_names(out, names(.x)) diff --git a/R/map.R b/R/map.R index 9838c489..597e97f1 100644 --- a/R/map.R +++ b/R/map.R @@ -46,6 +46,17 @@ #' This makes it easier to understand which arguments belong to which #' function and will tend to yield better error messages. #' +#' @param .parallel `r lifecycle::badge("experimental")` Whether to map in +#' parallel. Use `TRUE` to parallelize using the \CRANpkg{mirai} package. +#' * Set up parallelization in your session beforehand using +#' [mirai::daemons()]. +#' * Non-package functions are auto-crated for sharing with parallel +#' processes. You may [carrier::crate()] your function explicitly if you need +#' to supply additional objects along with your function. +#' * Use of `...` is not permitted in this context, [carrier::crate()] an +#' anonymous function instead. +#' +#' See [parallelization] for more details. #' @param .progress Whether to show a progress bar. Use `TRUE` to turn on #' a basic progress bar, use a string to give it a name, or see #' [progress_bars] for more details. @@ -125,50 +136,69 @@ #' map(\(df) lm(mpg ~ wt, data = df)) |> #' map(summary) |> #' map_dbl("r.squared") -map <- function(.x, .f, ..., .progress = FALSE) { - map_("list", .x, .f, ..., .progress = .progress) +#' +#' @examplesIf interactive() && requireNamespace("mirai", quietly = TRUE) && requireNamespace("carrier", quietly = TRUE) +#' # Run in interactive sessions only as spawns additional processes +#' +#' # To use parallelized map, set daemons (number of parallel processes) first: +#' mirai::daemons(2) +#' +#' mtcars |> map_dbl(sum, .parallel = TRUE) +#' +#' 1:10 |> +#' map(function(x) stats::rnorm(10, mean = x), .parallel = TRUE) |> +#' map_dbl(mean, .parallel = TRUE) +#' +#' mirai::daemons(0) +#' +map <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) { + map_("list", .x, .f, ..., .parallel = .parallel, .progress = .progress) } #' @rdname map #' @export -map_lgl <- function(.x, .f, ..., .progress = FALSE) { - map_("logical", .x, .f, ..., .progress = .progress) +map_lgl <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) { + map_("logical", .x, .f, ..., .parallel = .parallel, .progress = .progress) } #' @rdname map #' @export -map_int <- function(.x, .f, ..., .progress = FALSE) { - map_("integer", .x, .f, ..., .progress = .progress) +map_int <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) { + map_("integer", .x, .f, ..., .parallel = .parallel, .progress = .progress) } #' @rdname map #' @export -map_dbl <- function(.x, .f, ..., .progress = FALSE) { - map_("double", .x, .f, ..., .progress = .progress) +map_dbl <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) { + map_("double", .x, .f, ..., .parallel = .parallel, .progress = .progress) } #' @rdname map #' @export -map_chr <- function(.x, .f, ..., .progress = FALSE) { +map_chr <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) { local_deprecation_user_env() - map_("character", .x, .f, ..., .progress = .progress) + map_("character", .x, .f, ..., .parallel = .parallel, .progress = .progress) } map_ <- function(.type, .x, .f, ..., + .parallel = FALSE, .progress = FALSE, .purrr_user_env = caller_env(2), .purrr_error_call = caller_env()) { .x <- vctrs_vec_compat(.x, .purrr_user_env) vec_assert(.x, arg = ".x", call = .purrr_error_call) - n <- vec_size(.x) + .f <- as_mapper(.f, ...) - names <- vec_names(.x) + if (isTRUE(.parallel)) { + return(mmap_(.x, .f, .progress, .type, .purrr_error_call, ...)) + } - .f <- as_mapper(.f, ...) + n <- vec_size(.x) + names <- vec_names(.x) i <- 0L with_indexed_errors( @@ -179,21 +209,62 @@ map_ <- function(.type, ) } +mmap_ <- function(.x, .f, .progress, .type, error_call, ...) { + + if (is.null(the$packages_installed)) { + rlang::check_installed(c("mirai", "carrier"), reason = "for parallel map.") + the$packages_installed <- TRUE + } + + if (is.null(mirai::nextget("n"))) { + cli::cli_abort( + "No daemons set - use e.g. {.run mirai::daemons(6)} to set 6 local daemons.", + call = error_call + ) + } + if (...length()) { + cli::cli_abort( + "Don't use `...` with `.parallel = TRUE`.", + call = error_call + ) + } + + if (!isNamespace(topenv(environment(.f))) && !carrier::is_crate(.f)) { + .f <- carrier::crate(rlang::set_env(.f)) + cli::cli_inform(c( + v = "Automatically crated `.f`: {format(lobstr::obj_size(.f))}" + )) + } + + m <- mirai::mirai_map(.x, .f) + + options <- c(".stop", if (isTRUE(.progress)) ".progress") + x <- with_parallel_indexed_errors( + mirai::collect_mirai(m, options = options), + interrupt_expr = mirai::stop_mirai(m), + error_call = error_call + ) + if (.type != "list") { + x <- simplify_impl(x, ptype = vector(mode = .type), error_call = error_call) + } + x + +} #' @rdname map #' @param .ptype If `NULL`, the default, the output type is the common type #' of the elements of the result. Otherwise, supply a "prototype" giving #' the desired type of output. #' @export -map_vec <- function(.x, .f, ..., .ptype = NULL, .progress = FALSE) { - out <- map(.x, .f, ..., .progress = .progress) +map_vec <- function(.x, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) { + out <- map(.x, .f, ..., .parallel = .parallel, .progress = .progress) simplify_impl(out, ptype = .ptype) } #' @rdname map #' @export -walk <- function(.x, .f, ..., .progress = FALSE) { - map(.x, .f, ..., .progress = .progress) +walk <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) { + map(.x, .f, ..., .parallel = .parallel, .progress = .progress) invisible(.x) } @@ -225,6 +296,30 @@ with_indexed_errors <- function(expr, i, names = NULL, error_call = caller_env() ) } +with_parallel_indexed_errors <- function(expr, interrupt_expr = NULL, error_call = caller_env()) { + withCallingHandlers( + expr, + error = function(cnd) { + location <- cnd$location + iname <- cnd$name + cli::cli_abort( + c( + i = "In index: {location}.", + i = if (length(iname) && nzchar(iname)) "With name: {iname}." + ), + location = location, + name = iname, + parent = cnd$parent, + call = error_call, + class = "purrr_error_indexed" + ) + }, + interrupt = function(cnd) { + interrupt_expr + } + ) +} + #' Indexed errors (`purrr_error_indexed`) #' #' @description diff --git a/R/map2.R b/R/map2.R index f05488ef..d34bfbb3 100644 --- a/R/map2.R +++ b/R/map2.R @@ -30,28 +30,28 @@ #' by_cyl <- mtcars |> split(mtcars$cyl) #' mods <- by_cyl |> map(\(df) lm(mpg ~ wt, data = df)) #' map2(mods, by_cyl, predict) -map2 <- function(.x, .y, .f, ..., .progress = FALSE) { - map2_("list", .x, .y, .f, ..., .progress = .progress) +map2 <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) { + map2_("list", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname map2 -map2_lgl <- function(.x, .y, .f, ..., .progress = FALSE) { - map2_("logical", .x, .y, .f, ..., .progress = .progress) +map2_lgl <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) { + map2_("logical", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname map2 -map2_int <- function(.x, .y, .f, ..., .progress = FALSE) { - map2_("integer", .x, .y, .f, ..., .progress = .progress) +map2_int <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) { + map2_("integer", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname map2 -map2_dbl <- function(.x, .y, .f, ..., .progress = FALSE) { - map2_("double", .x, .y, .f, ..., .progress = .progress) +map2_dbl <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) { + map2_("double", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname map2 -map2_chr <- function(.x, .y, .f, ..., .progress = FALSE) { - map2_("character", .x, .y, .f, ..., .progress = .progress) +map2_chr <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) { + map2_("character", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress) } map2_ <- function(.type, @@ -59,6 +59,7 @@ map2_ <- function(.type, .y, .f, ..., + .parallel = FALSE, .progress = FALSE, .purrr_user_env = caller_env(2), .purrr_error_call = caller_env()) { @@ -74,6 +75,14 @@ map2_ <- function(.type, .f <- as_mapper(.f, ...) + if (isTRUE(.parallel)) { + attributes(args) <- list( + class = "data.frame", + row.names = if (is.null(names)) .set_row_names(n) else names + ) + return(mmap_(args, .f, .progress, .type, .purrr_error_call, ...)) + } + i <- 0L with_indexed_errors( i = i, @@ -85,14 +94,14 @@ map2_ <- function(.type, #' @rdname map2 #' @export -map2_vec <- function(.x, .y, .f, ..., .ptype = NULL, .progress = FALSE) { - out <- map2(.x, .y, .f, ..., .progress = .progress) +map2_vec <- function(.x, .y, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) { + out <- map2(.x, .y, .f, ..., .parallel = .parallel, .progress = .progress) simplify_impl(out, ptype = .ptype) } #' @export #' @rdname map2 -walk2 <- function(.x, .y, .f, ..., .progress = FALSE) { - map2(.x, .y, .f, ..., .progress = .progress) +walk2 <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) { + map2(.x, .y, .f, ..., .parallel = .parallel, .progress = .progress) invisible(.x) } diff --git a/R/parallelization.R b/R/parallelization.R new file mode 100644 index 00000000..b88cff04 --- /dev/null +++ b/R/parallelization.R @@ -0,0 +1,110 @@ +#' Parallelization in purrr +#' +#' @description +#' purrr's map functions have a `.parallel` argument to parallelize a map using +#' the \CRANpkg{mirai} package. This allows you to run computations in parallel +#' using more cores on your machine, or distributed over the network. +#' +#' Parallelizing a map using 'n' processes does not automatically lead to it +#' taking 1/n of the time. Additional overhead from setting up the parallel task +#' and communicating with parallel processes eats into this benefit, and can +#' outweigh it for very short tasks or those involving large amounts of data. +#' The threshold at which parallelization becomes clearly beneficial will differ +#' according to your individual setup and task, but a rough guide would be in +#' the order of 100 microseconds to 1 millisecond for each map iteration. +#' +#' # Daemons settings +#' +#' How and where parallelization occurs is determined by +#' [mirai::daemons()]. This is a function from the \pkg{mirai} +#' package that sets up daemons (persistent background processes that receive +#' parallel computations) on your local machine or across the network. +#' +#' \pkg{purrr} requires these to be set up prior to performing any parallel map +#' operations. It is usual to set daemons once per session. You can leave them +#' running as they consume almost no resources whilst waiting to receive tasks. +#' The following sets up 6 daemons on your local machine: +#' +#' \preformatted{ +#' mirai::daemons(6) +#' } +#' +#' `daemons()`arguments: +#' +#' * `n`: the number of daemons to launch on your local machine, e.g. +#' `mirai::daemons(6)`. As a rule of thumb, for maximum efficiency this should +#' be (at most) one less than the number of cores on your machine, leaving one +#' core for the main R process. +#' * `url` and `remote`: used to set up and launch daemons for distributed +#' computing over the network. See [mirai::daemons] function documentation for +#' more details. +#' * None: calling `mirai::daemons()` with no arguments returns a summary of the +#' current connection status and mirai tasks. +#' +#' For details on further options, see [mirai::daemons]. +#' +#' Resetting daemons: +#' +#' Daemons persist for the duration of your session. To reset and terminate any +#' existing daemons: +#' +#' \preformatted{ +#' mirai::daemons(0) +#' } +#' +#' All daemons automatically terminate when your session ends and the connection +#' drops. Hence you do not need to explicitly terminate daemons in this instance, +#' although it is still good practice to do so. +#' +#' Note: it should always be for the user to set daemons. If you are using +#' parallel map within a package, do not make any [mirai::daemons()] calls +#' within the package. This helps prevent inadvertently spawning too many +#' daemons if functions are used recursively within each other. +#' +#' # Crating a function +#' +#' [carrier::crate()] provides a systematic way of making the function `.f` +#' self-contained so that it can be readily shared with other processes. +#' +#' Crating ensures that everything needed by the function is serialized along +#' with it, but not other objects which happen to be in the function's enclosing +#' environment. This helps to prevent inadvertently shipping large data objects +#' to daemons, where they are not needed. +#' +#' Any non-package function supplied to `.f` will be automatically crated. When +#' this happens, a confirmation along with the crate size is printed to the +#' console. Package functions are not crated as these are already +#' self-contained. +#' +#' If your function `.f()` contains free variables, for example it references +#' other local functions in its body, then explicitly [carrier::crate()] your +#' function supplying these variables to its `...` argument. This ensures that +#' these objects are available to `.f()` when it is executed in a parallel +#' process. +#' +#' Examples: +#' \preformatted{ +#' # package functions are not auto-crated: +#' map(1:3, stats::runif, .parallel = TRUE) +#' +#' # other functions (incl. anonymous functions) are auto-crated: +#' mtcars |> map_dbl(function(...) sum(...), .parallel = TRUE) +#' +#' # explicitly crate a function to include other objects required by it: +#' fun <- function(x) \{x + x \%\% 2 \} +#' map(1:3, carrier::crate(function(x) x + fun(x), fun = fun), .parallel = TRUE) +#' } +#' +#' For details on further options, see [carrier::crate]. +#' +#' # Further documentation +#' +#' \pkg{purrr}'s parallelization is powered by \CRANpkg{mirai}.See the +#' [mirai introduction and reference](https://shikokuchuo.net/mirai/articles/mirai.html) +#' for more details. +#' +#' Crating is provided by the \CRANpkg{carrier} package. See the +#' [carrier readme](https://github.com/r-lib/carrier) for more details. +#' +#' @name parallelization +NULL diff --git a/R/pmap.R b/R/pmap.R index 841818bc..064b0c37 100644 --- a/R/pmap.R +++ b/R/pmap.R @@ -92,35 +92,36 @@ #' pmin(df$x, df$y) #' map2_dbl(df$x, df$y, min) #' pmap_dbl(df, min) -pmap <- function(.l, .f, ..., .progress = FALSE) { - pmap_("list", .l, .f, ..., .progress = .progress) +pmap <- function(.l, .f, ..., .parallel = FALSE, .progress = FALSE) { + pmap_("list", .l, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname pmap -pmap_lgl <- function(.l, .f, ..., .progress = FALSE) { - pmap_("logical", .l, .f, ..., .progress = .progress) +pmap_lgl <- function(.l, .f, ..., .parallel = FALSE, .progress = FALSE) { + pmap_("logical", .l, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname pmap -pmap_int <- function(.l, .f, ..., .progress = FALSE) { - pmap_("integer", .l, .f, ..., .progress = .progress) +pmap_int <- function(.l, .f, ..., .parallel = FALSE, .progress = FALSE) { + pmap_("integer", .l, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname pmap -pmap_dbl <- function(.l, .f, ..., .progress = FALSE) { - pmap_("double", .l, .f, ..., .progress = .progress) +pmap_dbl <- function(.l, .f, ..., .parallel = FALSE, .progress = FALSE) { + pmap_("double", .l, .f, ..., .parallel = .parallel, .progress = .progress) } #' @export #' @rdname pmap -pmap_chr <- function(.l, .f, ..., .progress = FALSE) { - pmap_("character", .l, .f, ..., .progress = .progress) +pmap_chr <- function(.l, .f, ..., .parallel = FALSE, .progress = FALSE) { + pmap_("character", .l, .f, ..., .parallel = .parallel, .progress = .progress) } pmap_ <- function(.type, .l, .f, ..., + .parallel = FALSE, .progress = FALSE, .purrr_user_env = caller_env(2), .purrr_error_call = caller_env()) { @@ -138,6 +139,15 @@ pmap_ <- function(.type, .f <- as_mapper(.f, ...) + if (isTRUE(.parallel)) { + attributes(.l) <- list( + names = names(.l), + class = "data.frame", + row.names = if (is.null(names)) .set_row_names(n) else names + ) + return(mmap_(.l, .f, .progress, .type, .purrr_error_call, ...)) + } + call_names <- names(.l) call_n <- length(.l) @@ -153,16 +163,16 @@ pmap_ <- function(.type, #' @export #' @rdname pmap -pmap_vec <- function(.l, .f, ..., .ptype = NULL, .progress = FALSE) { +pmap_vec <- function(.l, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) { .f <- as_mapper(.f, ...) - out <- pmap(.l, .f, ..., .progress = .progress) + out <- pmap(.l, .f, ..., .parallel = .parallel, .progress = .progress) simplify_impl(out, ptype = .ptype) } #' @export #' @rdname pmap -pwalk <- function(.l, .f, ..., .progress = FALSE) { - pmap(.l, .f, ..., .progress = .progress) +pwalk <- function(.l, .f, ..., .parallel = FALSE, .progress = FALSE) { + pmap(.l, .f, ..., .parallel = .parallel, .progress = .progress) invisible(.l) } diff --git a/_pkgdown.yml b/_pkgdown.yml index e553a0a3..2205d651 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -115,3 +115,4 @@ reference: - array_tree - rate-helpers - progress_bars + - parallelization diff --git a/man/map.Rd b/man/map.Rd index 1903b700..10fea36b 100644 --- a/man/map.Rd +++ b/man/map.Rd @@ -10,19 +10,19 @@ \alias{walk} \title{Apply a function to each element of a vector} \usage{ -map(.x, .f, ..., .progress = FALSE) +map(.x, .f, ..., .parallel = FALSE, .progress = FALSE) -map_lgl(.x, .f, ..., .progress = FALSE) +map_lgl(.x, .f, ..., .parallel = FALSE, .progress = FALSE) -map_int(.x, .f, ..., .progress = FALSE) +map_int(.x, .f, ..., .parallel = FALSE, .progress = FALSE) -map_dbl(.x, .f, ..., .progress = FALSE) +map_dbl(.x, .f, ..., .parallel = FALSE, .progress = FALSE) -map_chr(.x, .f, ..., .progress = FALSE) +map_chr(.x, .f, ..., .parallel = FALSE, .progress = FALSE) -map_vec(.x, .f, ..., .ptype = NULL, .progress = FALSE) +map_vec(.x, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) -walk(.x, .f, ..., .progress = FALSE) +walk(.x, .f, ..., .parallel = FALSE, .progress = FALSE) } \arguments{ \item{.x}{A list or atomic vector.} @@ -54,6 +54,20 @@ x |> map(\\(x) f(x, 1, 2, collapse = ",")) This makes it easier to understand which arguments belong to which function and will tend to yield better error messages.} +\item{.parallel}{\ifelse{html}{\href{https://lifecycle.r-lib.org/articles/stages.html#experimental}{\figure{lifecycle-experimental.svg}{options: alt='[Experimental]'}}}{\strong{[Experimental]}} Whether to map in +parallel. Use \code{TRUE} to parallelize using the \CRANpkg{mirai} package. +\itemize{ +\item Set up parallelization in your session beforehand using +\code{\link[mirai:daemons]{mirai::daemons()}}. +\item Non-package functions are auto-crated for sharing with parallel +processes. You may \code{\link[carrier:crate]{carrier::crate()}} your function explicitly if you need +to supply additional objects along with your function. +\item Use of \code{...} is not permitted in this context, \code{\link[carrier:crate]{carrier::crate()}} an +anonymous function instead. +} + +See \link{parallelization} for more details.} + \item{.progress}{Whether to show a progress bar. Use \code{TRUE} to turn on a basic progress bar, use a string to give it a name, or see \link{progress_bars} for more details.} @@ -149,6 +163,21 @@ mtcars |> map(\(df) lm(mpg ~ wt, data = df)) |> map(summary) |> map_dbl("r.squared") + +\dontshow{if (interactive() && requireNamespace("mirai", quietly = TRUE) && requireNamespace("carrier", quietly = TRUE)) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} +# Run in interactive sessions only as spawns additional processes + +# To use parallelized map, set daemons (number of parallel processes) first: +mirai::daemons(2) + +mtcars |> map_dbl(sum, .parallel = TRUE) + +1:10 |> + map(function(x) stats::rnorm(10, mean = x), .parallel = TRUE) |> + map_dbl(mean, .parallel = TRUE) + +mirai::daemons(0) +\dontshow{\}) # examplesIf} } \seealso{ \code{\link[=map_if]{map_if()}} for applying a function to only those elements diff --git a/man/map2.Rd b/man/map2.Rd index 2040c89c..977ccd38 100644 --- a/man/map2.Rd +++ b/man/map2.Rd @@ -10,19 +10,19 @@ \alias{walk2} \title{Map over two inputs} \usage{ -map2(.x, .y, .f, ..., .progress = FALSE) +map2(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) -map2_lgl(.x, .y, .f, ..., .progress = FALSE) +map2_lgl(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) -map2_int(.x, .y, .f, ..., .progress = FALSE) +map2_int(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) -map2_dbl(.x, .y, .f, ..., .progress = FALSE) +map2_dbl(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) -map2_chr(.x, .y, .f, ..., .progress = FALSE) +map2_chr(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) -map2_vec(.x, .y, .f, ..., .ptype = NULL, .progress = FALSE) +map2_vec(.x, .y, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) -walk2(.x, .y, .f, ..., .progress = FALSE) +walk2(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) } \arguments{ \item{.x, .y}{A pair of vectors, usually the same length. If not, a vector @@ -52,6 +52,20 @@ x |> map(\\(x) f(x, 1, 2, collapse = ",")) This makes it easier to understand which arguments belong to which function and will tend to yield better error messages.} +\item{.parallel}{\ifelse{html}{\href{https://lifecycle.r-lib.org/articles/stages.html#experimental}{\figure{lifecycle-experimental.svg}{options: alt='[Experimental]'}}}{\strong{[Experimental]}} Whether to map in +parallel. Use \code{TRUE} to parallelize using the \CRANpkg{mirai} package. +\itemize{ +\item Set up parallelization in your session beforehand using +\code{\link[mirai:daemons]{mirai::daemons()}}. +\item Non-package functions are auto-crated for sharing with parallel +processes. You may \code{\link[carrier:crate]{carrier::crate()}} your function explicitly if you need +to supply additional objects along with your function. +\item Use of \code{...} is not permitted in this context, \code{\link[carrier:crate]{carrier::crate()}} an +anonymous function instead. +} + +See \link{parallelization} for more details.} + \item{.progress}{Whether to show a progress bar. Use \code{TRUE} to turn on a basic progress bar, use a string to give it a name, or see \link{progress_bars} for more details.} diff --git a/man/map_if.Rd b/man/map_if.Rd index c16b0c10..79c8e271 100644 --- a/man/map_if.Rd +++ b/man/map_if.Rd @@ -7,7 +7,7 @@ \usage{ map_if(.x, .p, .f, ..., .else = NULL) -map_at(.x, .at, .f, ..., .progress = FALSE) +map_at(.x, .at, .f, ..., .parallel = FALSE, .progress = FALSE) } \arguments{ \item{.x}{A list or atomic vector.} @@ -57,6 +57,20 @@ and returns a logical, integer, or character vector of elements to select. installed, you can use \code{vars()} and tidyselect helpers to select elements.} +\item{.parallel}{\ifelse{html}{\href{https://lifecycle.r-lib.org/articles/stages.html#experimental}{\figure{lifecycle-experimental.svg}{options: alt='[Experimental]'}}}{\strong{[Experimental]}} Whether to map in +parallel. Use \code{TRUE} to parallelize using the \CRANpkg{mirai} package. +\itemize{ +\item Set up parallelization in your session beforehand using +\code{\link[mirai:daemons]{mirai::daemons()}}. +\item Non-package functions are auto-crated for sharing with parallel +processes. You may \code{\link[carrier:crate]{carrier::crate()}} your function explicitly if you need +to supply additional objects along with your function. +\item Use of \code{...} is not permitted in this context, \code{\link[carrier:crate]{carrier::crate()}} an +anonymous function instead. +} + +See \link{parallelization} for more details.} + \item{.progress}{Whether to show a progress bar. Use \code{TRUE} to turn on a basic progress bar, use a string to give it a name, or see \link{progress_bars} for more details.} diff --git a/man/parallelization.Rd b/man/parallelization.Rd new file mode 100644 index 00000000..8386e718 --- /dev/null +++ b/man/parallelization.Rd @@ -0,0 +1,112 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/parallelization.R +\name{parallelization} +\alias{parallelization} +\title{Parallelization in purrr} +\description{ +purrr's map functions have a \code{.parallel} argument to parallelize a map using +the \CRANpkg{mirai} package. This allows you to run computations in parallel +using more cores on your machine, or distributed over the network. + +Parallelizing a map using 'n' processes does not automatically lead to it +taking 1/n of the time. Additional overhead from setting up the parallel task +and communicating with parallel processes eats into this benefit, and can +outweigh it for very short tasks or those involving large amounts of data. +The threshold at which parallelization becomes clearly beneficial will differ +according to your individual setup and task, but a rough guide would be in +the order of 100 microseconds to 1 millisecond for each map iteration. +} +\section{Daemons settings}{ +How and where parallelization occurs is determined by +\code{\link[mirai:daemons]{mirai::daemons()}}. This is a function from the \pkg{mirai} +package that sets up daemons (persistent background processes that receive +parallel computations) on your local machine or across the network. + +\pkg{purrr} requires these to be set up prior to performing any parallel map +operations. It is usual to set daemons once per session. You can leave them +running as they consume almost no resources whilst waiting to receive tasks. +The following sets up 6 daemons on your local machine: + +\preformatted{ +mirai::daemons(6) +} + +\code{daemons()}arguments: +\itemize{ +\item \code{n}: the number of daemons to launch on your local machine, e.g. +\code{mirai::daemons(6)}. As a rule of thumb, for maximum efficiency this should +be (at most) one less than the number of cores on your machine, leaving one +core for the main R process. +\item \code{url} and \code{remote}: used to set up and launch daemons for distributed +computing over the network. See \link[mirai:daemons]{mirai::daemons} function documentation for +more details. +\item None: calling \code{mirai::daemons()} with no arguments returns a summary of the +current connection status and mirai tasks. +} + +For details on further options, see \link[mirai:daemons]{mirai::daemons}. + +Resetting daemons: + +Daemons persist for the duration of your session. To reset and terminate any +existing daemons: + +\preformatted{ +mirai::daemons(0) +} + +All daemons automatically terminate when your session ends and the connection +drops. Hence you do not need to explicitly terminate daemons in this instance, +although it is still good practice to do so. + +Note: it should always be for the user to set daemons. If you are using +parallel map within a package, do not make any \code{\link[mirai:daemons]{mirai::daemons()}} calls +within the package. This helps prevent inadvertently spawning too many +daemons if functions are used recursively within each other. +} + +\section{Crating a function}{ +\code{\link[carrier:crate]{carrier::crate()}} provides a systematic way of making the function \code{.f} +self-contained so that it can be readily shared with other processes. + +Crating ensures that everything needed by the function is serialized along +with it, but not other objects which happen to be in the function's enclosing +environment. This helps to prevent inadvertently shipping large data objects +to daemons, where they are not needed. + +Any non-package function supplied to \code{.f} will be automatically crated. When +this happens, a confirmation along with the crate size is printed to the +console. Package functions are not crated as these are already +self-contained. + +If your function \code{.f()} contains free variables, for example it references +other local functions in its body, then explicitly \code{\link[carrier:crate]{carrier::crate()}} your +function supplying these variables to its \code{...} argument. This ensures that +these objects are available to \code{.f()} when it is executed in a parallel +process. + +Examples: +\preformatted{ +# package functions are not auto-crated: +map(1:3, stats::runif, .parallel = TRUE) + +# other functions (incl. anonymous functions) are auto-crated: +mtcars |> map_dbl(function(...) sum(...), .parallel = TRUE) + +# explicitly crate a function to include other objects required by it: +fun <- function(x) \{x + x \%\% 2 \} +map(1:3, carrier::crate(function(x) x + fun(x), fun = fun), .parallel = TRUE) +} + +For details on further options, see \link[carrier:crate]{carrier::crate}. +} + +\section{Further documentation}{ +\pkg{purrr}'s parallelization is powered by \CRANpkg{mirai}.See the +\href{https://shikokuchuo.net/mirai/articles/mirai.html}{mirai introduction and reference} +for more details. + +Crating is provided by the \CRANpkg{carrier} package. See the +\href{https://github.com/r-lib/carrier}{carrier readme} for more details. +} + diff --git a/man/pmap.Rd b/man/pmap.Rd index 65babeef..71a43232 100644 --- a/man/pmap.Rd +++ b/man/pmap.Rd @@ -10,19 +10,19 @@ \alias{pwalk} \title{Map over multiple input simultaneously (in "parallel")} \usage{ -pmap(.l, .f, ..., .progress = FALSE) +pmap(.l, .f, ..., .parallel = FALSE, .progress = FALSE) -pmap_lgl(.l, .f, ..., .progress = FALSE) +pmap_lgl(.l, .f, ..., .parallel = FALSE, .progress = FALSE) -pmap_int(.l, .f, ..., .progress = FALSE) +pmap_int(.l, .f, ..., .parallel = FALSE, .progress = FALSE) -pmap_dbl(.l, .f, ..., .progress = FALSE) +pmap_dbl(.l, .f, ..., .parallel = FALSE, .progress = FALSE) -pmap_chr(.l, .f, ..., .progress = FALSE) +pmap_chr(.l, .f, ..., .parallel = FALSE, .progress = FALSE) -pmap_vec(.l, .f, ..., .ptype = NULL, .progress = FALSE) +pmap_vec(.l, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) -pwalk(.l, .f, ..., .progress = FALSE) +pwalk(.l, .f, ..., .parallel = FALSE, .progress = FALSE) } \arguments{ \item{.l}{A list of vectors. The length of \code{.l} determines the number of @@ -58,6 +58,20 @@ x |> map(\\(x) f(x, 1, 2, collapse = ",")) This makes it easier to understand which arguments belong to which function and will tend to yield better error messages.} +\item{.parallel}{\ifelse{html}{\href{https://lifecycle.r-lib.org/articles/stages.html#experimental}{\figure{lifecycle-experimental.svg}{options: alt='[Experimental]'}}}{\strong{[Experimental]}} Whether to map in +parallel. Use \code{TRUE} to parallelize using the \CRANpkg{mirai} package. +\itemize{ +\item Set up parallelization in your session beforehand using +\code{\link[mirai:daemons]{mirai::daemons()}}. +\item Non-package functions are auto-crated for sharing with parallel +processes. You may \code{\link[carrier:crate]{carrier::crate()}} your function explicitly if you need +to supply additional objects along with your function. +\item Use of \code{...} is not permitted in this context, \code{\link[carrier:crate]{carrier::crate()}} an +anonymous function instead. +} + +See \link{parallelization} for more details.} + \item{.progress}{Whether to show a progress bar. Use \code{TRUE} to turn on a basic progress bar, use a string to give it a name, or see \link{progress_bars} for more details.} diff --git a/purrr.Rproj b/purrr.Rproj index cba1b6b7..c968e985 100644 --- a/purrr.Rproj +++ b/purrr.Rproj @@ -1,4 +1,5 @@ Version: 1.0 +ProjectId: 7599ac37-3c41-46be-af39-1e11a14300d2 RestoreWorkspace: No SaveWorkspace: No diff --git a/tests/testthat/_snaps/parallel.md b/tests/testthat/_snaps/parallel.md new file mode 100644 index 00000000..4d07be21 --- /dev/null +++ b/tests/testthat/_snaps/parallel.md @@ -0,0 +1,168 @@ +# Can't parallel map without first setting daemons + + Code + map(list(x = 1, y = 2), list, .parallel = TRUE) + Condition + Error in `map()`: + ! No daemons set - use e.g. `mirai::daemons(6)` to set 6 local daemons. + +# Can't use `...` in a parallel map + + Code + map(list(x = 1, y = 2), list, a = "wrong", .parallel = TRUE) + Condition + Error in `map()`: + ! Don't use `...` with `.parallel = TRUE`. + +# all inform about location of problem + + Code + map_int(1:3, carrier::crate(function(x, bad = 2:1) if (x == 3) bad else x), + .parallel = TRUE) + Condition + Error in `map_int()`: + ! `x[[3]]` must have size 1, not size 2. + Code + map_int(1:3, carrier::crate(function(x, bad = "x") if (x == 3) bad else x), + .parallel = TRUE) + Condition + Error in `map_int()`: + ! Can't convert `[[3]]` to . + Code + map(1:3, carrier::crate(function(x, bad = stop("Doesn't work")) if (x == + 3) bad else x), .parallel = TRUE) + Condition + Error in `map()`: + i In index: 3. + Caused by error: + ! Doesn't work + +# error location uses name if present + + Code + map_int(c(a = 1, b = 2, c = 3), carrier::crate(function(x, bad = stop( + "Doesn't work")) if (x == 3) bad else x), .parallel = TRUE) + Condition + Error in `map_int()`: + i In index: 3. + i With name: c. + Caused by error: + ! Doesn't work + Code + map_int(c(a = 1, b = 2, 3), carrier::crate(function(x, bad = stop( + "Doesn't work")) if (x == 3) bad else x), .parallel = TRUE) + Condition + Error in `map_int()`: + i In index: 3. + Caused by error: + ! Doesn't work + +# requires output be length 1 and have common type + + Code + map_vec(1:2, ~ rep(1, .x), .parallel = TRUE) + Condition + Error in `map_vec()`: + ! `out[[2]]` must have size 1, not size 2. + Code + map_vec(1:2, ~ if (.x == 1) factor("x") else 1, .parallel = TRUE) + Condition + Error in `map_vec()`: + ! Can't combine `[[1]]` > and `[[2]]` . + +# can enforce .ptype + + Code + map_vec(1:2, ~ factor("x"), .ptype = integer(), .parallel = TRUE) + Condition + Error in `map_vec()`: + ! Can't convert `[[1]]` > to . + +# verifies result types and length + + Code + map2_int(1, 1, ~"x", .parallel = TRUE) + Condition + Error in `map2_int()`: + ! Can't convert `[[1]]` to . + Code + map2_int(1, 1, ~ 1:2, .parallel = TRUE) + Condition + Error in `map2_int()`: + ! `x[[1]]` must have size 1, not size 2. + Code + map2_vec(1, 1, ~1, .ptype = character(), .parallel = TRUE) + Condition + Error in `map2_vec()`: + ! Can't convert `[[1]]` to . + +--- + + Code + pmap_int(list(1), ~"x", .parallel = TRUE) + Condition + Error in `pmap_int()`: + ! Can't convert `[[1]]` to . + Code + pmap_int(list(1), ~ 1:2, .parallel = TRUE) + Condition + Error in `pmap_int()`: + ! `x[[1]]` must have size 1, not size 2. + Code + pmap_vec(list(1), ~1, .ptype = character(), .parallel = TRUE) + Condition + Error in `pmap_vec()`: + ! Can't convert `[[1]]` to . + +# requires vector inputs + + Code + map2(environment(), "a", identity, .parallel = TRUE) + Condition + Error in `map2()`: + ! `.x` must be a vector, not an environment. + Code + map2("a", environment(), "a", identity, .parallel = TRUE) + Condition + Error in `map2()`: + ! `.y` must be a vector, not an environment. + +# recycles inputs + + Code + map2(1:2, 1:3, `+`, .parallel = TRUE) + Condition + Error in `map2()`: + ! Can't recycle `.x` (size 2) to match `.y` (size 3). + Code + map2(1:2, integer(), `+`, .parallel = TRUE) + Condition + Error in `map2()`: + ! Can't recycle `.x` (size 2) to match `.y` (size 0). + +--- + + Code + pmap(list(1:2, 1:3), `+`, .parallel = TRUE) + Condition + Error in `pmap()`: + ! Can't recycle `.l[[1]]` (size 2) to match `.l[[2]]` (size 3). + Code + pmap(list(1:2, integer()), `+`, .parallel = TRUE) + Condition + Error in `pmap()`: + ! Can't recycle `.l[[1]]` (size 2) to match `.l[[2]]` (size 0). + +# requires list of vectors + + Code + pmap(environment(), identity) + Condition + Error in `pmap()`: + ! `.l` must be a list, not an environment. + Code + pmap(list(environment()), identity) + Condition + Error in `pmap()`: + ! `.l[[1]]` must be a vector, not an environment. + diff --git a/tests/testthat/test-parallel.R b/tests/testthat/test-parallel.R new file mode 100644 index 00000000..f1add1dc --- /dev/null +++ b/tests/testthat/test-parallel.R @@ -0,0 +1,384 @@ +skip_if_not_installed("mirai") +skip_if_not_installed("carrier") + +test_that("Can't parallel map without first setting daemons", { + expect_snapshot(error = TRUE, { + map(list(x = 1, y = 2), list, .parallel = TRUE) + }) +}) + +# set up daemons +mirai::daemons(1, dispatcher = FALSE) # ensures only 1 additional process on CRAN +on.exit(mirai::daemons(0), add = TRUE) + +test_that("Can't use `...` in a parallel map", { + expect_snapshot(error = TRUE, { + map(list(x = 1, y = 2), list, a = "wrong", .parallel = TRUE) + }) +}) + +# map ----------------------------------------------------------------------- + +test_that("preserves names", { + out <- map(list(x = 1, y = 2), identity, .parallel = TRUE) + expect_equal(names(out), c("x", "y")) +}) + +test_that("works with matrices/arrays (#970)", { + expect_identical( + map_int(matrix(1:4, nrow = 2), identity, .parallel = TRUE), + 1:4 + ) +}) + +test_that("all inform about location of problem", { + skip_if_not_installed("carrier") + + expect_snapshot(error = TRUE, { + map_int(1:3, carrier::crate(function(x, bad = 2:1) if (x == 3) bad else x), .parallel = TRUE) + map_int(1:3, carrier::crate(function(x, bad = "x") if (x == 3) bad else x), .parallel = TRUE) + map(1:3, carrier::crate(function(x, bad = stop("Doesn't work")) if (x == 3) bad else x), .parallel = TRUE) + }) + + cnd <- catch_cnd(map(1:3, carrier::crate(function(x, bad = stop("Doesn't work")) if (x == 3) bad else x), .parallel = TRUE)) + expect_s3_class(cnd, "purrr_error_indexed") + expect_equal(cnd$location, 3) + expect_equal(cnd$name, NULL) +}) + +test_that("error location uses name if present", { + skip_if_not_installed("carrier") + + expect_snapshot(error = TRUE, { + map_int(c(a = 1, b = 2, c = 3), carrier::crate(function(x, bad = stop("Doesn't work")) if (x == 3) bad else x), .parallel = TRUE) + map_int(c(a = 1, b = 2, 3), carrier::crate(function(x, bad = stop("Doesn't work")) if (x == 3) bad else x), .parallel = TRUE) + }) + + cnd <- catch_cnd(map(c(1, 2, c = 3), carrier::crate(function(x, bad = stop("Doesn't work")) if (x == 3) bad else x), .parallel = TRUE)) + expect_s3_class(cnd, "purrr_error_indexed") + expect_equal(cnd$location, 3) + expect_equal(cnd$name, "c") +}) + +test_that("0 length input gives 0 length output", { + expect_equal(map(list(), identity, .parallel = TRUE), list()) + expect_equal(map(NULL, identity, .parallel = TRUE), list()) + + expect_equal(map_lgl(NULL, identity), logical()) +}) + +test_that("map() always returns a list", { + expect_bare(map(mtcars, mean, .parallel = TRUE), "list") +}) + +test_that("types automatically coerced correctly", { + expect_identical(map_lgl(c(NA, 0, 1), identity, .parallel = TRUE), c(NA, FALSE, TRUE)) + + expect_identical(map_int(c(NA, FALSE, TRUE), identity, .parallel = TRUE), c(NA, 0L, 1L)) + expect_identical(map_int(c(NA, 1, 2), identity, .parallel = TRUE), c(NA, 1L, 2L)) + + expect_identical(map_dbl(c(NA, FALSE, TRUE), identity, .parallel = TRUE), c(NA, 0, 1)) + expect_identical(map_dbl(c(NA, 1L, 2L), identity, .parallel = TRUE), c(NA, 1, 2)) + + expect_identical(map_chr(NA, identity, .parallel = TRUE), NA_character_) +}) + +test_that("logical and integer NA become correct double NA", { + expect_identical( + map_dbl(list(NA, NA_integer_), identity, .parallel = TRUE), + c(NA_real_, NA_real_) + ) +}) + +test_that("map forces arguments in same way as base R", { + f_map <- map(1:2, function(i) function(x) x + i, .parallel = TRUE) + f_base <- lapply(1:2, function(i) function(x) x + i) + + expect_equal(f_map[[1]](0), f_base[[1]](0)) + expect_equal(f_map[[2]](0), f_base[[2]](0)) +}) + +test_that("primitive dispatch correctly", { + skip_if_not_installed("carrier") + + method <- function(x) "dispatched!" + x <- structure(list(), class = "test_class") + expect_identical( + map( + list(x, x), + carrier::crate(function(x) as.character(x), as.character.test_class = method), + .parallel = TRUE + ), + list("dispatched!", "dispatched!") + ) +}) + +test_that("map() with empty input copies names", { + named_list <- named(list()) + expect_identical( map(named_list, identity, .parallel = TRUE), named(list())) + expect_identical(map_lgl(named_list, identity, .parallel = TRUE), named(lgl())) + expect_identical(map_int(named_list, identity, .parallel = TRUE), named(int())) + expect_identical(map_dbl(named_list, identity, .parallel = TRUE), named(dbl())) + expect_identical(map_chr(named_list, identity, .parallel = TRUE), named(chr())) +}) + +# map_vec ------------------------------------------------------------------ + +test_that("still iterates using [[", { + df <- data.frame(x = 1, y = 2, z = 3) + expect_equal(map_vec(df, length, .parallel = TRUE), c(x = 1, y = 1, z = 1)) +}) + +test_that("requires output be length 1 and have common type", { + expect_snapshot(error = TRUE, { + map_vec(1:2, ~ rep(1, .x), .parallel = TRUE) + map_vec(1:2, ~ if (.x == 1) factor("x") else 1, .parallel = TRUE) + }) +}) + +test_that("row-binds data frame output", { + out <- map_vec(1:2, ~ data.frame(x = .x), .parallel = TRUE) + expect_equal(out, data.frame(x = 1:2)) +}) + +test_that("concatenates list output", { + out <- map_vec(1:2, ~ list(.x), .parallel = TRUE) + expect_equal(out, list(1, 2)) +}) + +test_that("can enforce .ptype", { + expect_snapshot(error = TRUE, { + map_vec(1:2, ~ factor("x"), .ptype = integer(), .parallel = TRUE) + }) +}) + +# map2 --------------------------------------------------------------------- + +test_that("x and y mapped to first and second argument", { + expect_equal(map2(1, 2, function(x, y) x, .parallel = TRUE), list(1)) + expect_equal(map2(1, 2, function(x, y) y, .parallel = TRUE), list(2)) +}) + +test_that("variants return expected types", { + x <- list(1, 2, 3) + expect_true(is_bare_list(map2(x, 0, ~ 1, .parallel = TRUE))) + expect_true(is_bare_logical(map2_lgl(x, 0, ~ TRUE, .parallel = TRUE))) + expect_true(is_bare_integer(map2_int(x, 0, ~ 1, .parallel = TRUE))) + expect_true(is_bare_double(map2_dbl(x, 0, ~ 1.5, .parallel = TRUE))) + expect_true(is_bare_character(map2_chr(x, 0, ~ "x", .parallel = TRUE))) + expect_equal(walk2(x, 0, ~ "x", .parallel = TRUE), x) + + x <- list(FALSE, 1L, 1) + expect_true(is_bare_double(map2_vec(x, 0, ~ .x, .parallel = TRUE))) +}) + +test_that("0 length input gives 0 length output", { + expect_equal(map2(list(), list(), identity, .parallel = TRUE), list()) + expect_equal(map2(NULL, NULL, identity, .parallel = TRUE), list()) + + expect_equal(map2_lgl(NULL, NULL, identity, .parallel = TRUE), logical()) +}) + +test_that("verifies result types and length", { + expect_snapshot(error = TRUE, { + map2_int(1, 1, ~ "x", .parallel = TRUE) + map2_int(1, 1, ~ 1:2, .parallel = TRUE) + map2_vec(1, 1, ~ 1, .ptype = character(), .parallel = TRUE) + }) +}) + +test_that("works with vctrs records (#963)", { + x <- new_rcrd(list(x = c(1, 2), y = c("a", "b"))) + out <- list(new_rcrd(list(x = 1, y = "a")), new_rcrd(list(x = 2, y = "b"))) + expect_identical(map2(x, 1, ~ .x, .parallel = TRUE), out) +}) + +test_that("requires vector inputs", { + expect_snapshot(error = TRUE, { + map2(environment(), "a", identity, .parallel = TRUE) + map2("a", environment(), "a", identity, .parallel = TRUE) + }) +}) + +test_that("recycles inputs", { + expect_equal(map2(1:2, 1, `+`, .parallel = TRUE), list(2, 3)) + expect_equal(map2(integer(), 1, `+`, .parallel = TRUE), list()) + expect_equal(map2(NULL, 1, `+`, .parallel = TRUE), list()) + + expect_snapshot(error = TRUE, { + map2(1:2, 1:3, `+`, .parallel = TRUE) + map2(1:2, integer(), `+`, .parallel = TRUE) + }) +}) + +test_that("only takes names from x", { + x1 <- 1:2 + x2 <- set_names(x1, letters[1:2]) + x3 <- set_names(x1, "") + + expect_named(map2(x1, 1, `+`, .parallel = TRUE), NULL) + expect_named(map2(x2, 1, `+`, .parallel = TRUE), c("a", "b")) + expect_named(map2(x3, 1, `+`, .parallel = TRUE), c("", "")) + + # recycling them if needed (#779) + x4 <- c(a = 1) + expect_named(map2(x4, 1:2, `+`, .parallel = TRUE), c("a", "a")) +}) + +test_that("don't evaluate symbolic objects (#428)", { + map2(exprs(1 + 2), NA, ~ testthat::expect_identical(.x, quote(1 + 2)), .parallel = TRUE) + walk2(exprs(1 + 2), NA, ~ testthat::expect_identical(.x, quote(1 + 2)), .parallel = TRUE) + expect_true(TRUE) # so the test is not deemed empty and skipped +}) + +# pmap ---------------------------------------------------------------------- + +test_that(".f called with named arguments", { + x <- list(x = 1, 2, y = 3) + expect_equal(pmap(x, list, .parallel = TRUE), list(x)) +}) + +# no longer tested as `...` are forbidden when `.parallel = TRUE` +#test_that("... are passed after varying argumetns", { +# out <- pmap(list(x = 1:2), list, n = 1:2, .parallel = TRUE) +# expect_equal(out, list( +# list(x = 1, n = 1:2), +# list(x = 2, n = 1:2) +# )) +#}) + +test_that("variants return expected types", { + l <- list(list(1, 2, 3)) + expect_true(is_bare_list(pmap(l, ~ 1, .parallel = TRUE))) + expect_true(is_bare_logical(pmap_lgl(l, ~ TRUE, .parallel = TRUE))) + expect_true(is_bare_integer(pmap_int(l, ~ 1, .parallel = TRUE))) + expect_true(is_bare_double(pmap_dbl(l, ~ 1.5, .parallel = TRUE))) + expect_true(is_bare_character(pmap_chr(l, ~ "x", .parallel = TRUE))) + expect_equal(pwalk(l, ~ "x", .parallel = TRUE), l) + + l <- list(list(FALSE, 1L, 1)) + expect_true(is_bare_double(pmap_vec(l, ~ .x, .parallel = TRUE))) +}) + +test_that("verifies result types and length", { + expect_snapshot(error = TRUE, { + pmap_int(list(1), ~ "x", .parallel = TRUE) + pmap_int(list(1), ~ 1:2, .parallel = TRUE) + pmap_vec(list(1), ~ 1, .ptype = character(), .parallel = TRUE) + }) +}) + +test_that("0 length input gives 0 length output", { + expect_equal(pmap(list(list(), list()), identity, .parallel = TRUE), list()) + expect_equal(pmap(list(NULL, NULL), identity, .parallel = TRUE), list()) + expect_equal(pmap(list(), identity, .parallel = TRUE), list()) + expect_equal(pmap(NULL, identity, .parallel = TRUE), list()) + + expect_equal(pmap_lgl(NULL, identity, .parallel = TRUE), logical()) +}) + + +test_that("requires list of vectors", { + expect_snapshot(error = TRUE, { + pmap(environment(), identity) + pmap(list(environment()), identity) + }) +}) + +test_that("recycles inputs", { + expect_equal(pmap(list(1:2, 1), `+`, .parallel = TRUE), list(2, 3)) + expect_equal(pmap(list(integer(), 1), `+`, .parallel = TRUE), list()) + expect_equal(pmap(list(NULL, 1), `+`, .parallel = TRUE), list()) + + expect_snapshot(error = TRUE, { + pmap(list(1:2, 1:3), `+`, .parallel = TRUE) + pmap(list(1:2, integer()), `+`, .parallel = TRUE) + }) +}) + +test_that("only takes names from x", { + x1 <- 1:2 + x2 <- set_names(x1, letters[1:2]) + x3 <- set_names(x1, "") + + expect_named(pmap(list(x1, x2), `+`, .parallel = TRUE), NULL) + expect_named(pmap(list(x2, x2), `+`, .parallel = TRUE), c("a", "b")) + expect_named(pmap(list(x3, x2), `+`, .parallel = TRUE), c("", "")) + + # recycling them if needed (#779) + x4 <- c(a = 1) + expect_named(pmap(list(x4, 1:2), `+`, .parallel = TRUE), c("a", "a")) +}) + +test_that("avoid expensive [[ method on data frames", { + local_bindings( + `[[.mydf` = function(x, ...) stop("Not allowed!"), + .env = global_env() + ) + + df <- data.frame(x = 1:2, y = 2:1) + class(df) <- c("mydf", "data.frame") + + expect_equal(pmap(df, list, .parallel = list(`[[.mydf` = `[[.mydf`)), list(list(x = 1, y = 2), list(x = 2, y = 1))) + expect_equal(pmap_lgl(df, ~ TRUE, .parallel = list(`[[.mydf` = `[[.mydf`)), c(TRUE, TRUE)) + expect_equal(pmap_int(df, ~ 2, .parallel = list(`[[.mydf` = `[[.mydf`)), c(2, 2)) + expect_equal(pmap_dbl(df, ~ 3.5, .parallel = list(`[[.mydf` = `[[.mydf`)), c(3.5, 3.5)) + expect_equal(pmap_chr(df, ~ "x", .parallel = list(`[[.mydf` = `[[.mydf`)), c("x", "x")) +}) + +test_that("pmap works with empty lists", { + expect_identical(pmap(list(), ~ 1, .parallel = TRUE), list()) +}) + +test_that("preserves S3 class of input vectors (#358)", { + date <- as.Date("2018-09-27") + expect_identical(pmap(list(date), identity, .parallel = TRUE), list(date)) +}) + +test_that("works with vctrs records (#963)", { + x <- new_rcrd(list(x = c(1, 2), y = c("a", "b"))) + out <- list(new_rcrd(list(x = 1, y = "a")), new_rcrd(list(x = 2, y = "b"))) + expect_identical(pmap(list(x, 1, 1:2), ~ .x, .parallel = TRUE), out) +}) + +test_that("don't evaluate symbolic objects (#428)", { + pmap(list(exprs(1 + 2)), ~ testthat::expect_identical(.x, quote(1 + 2)), .parallel = TRUE) + pwalk(list(exprs(1 + 2)), ~ testthat::expect_identical(.x, quote(1 + 2)), .parallel = TRUE) + expect_true(TRUE) # so the test is not deemed empty and skipped +}) + +# imap ---------------------------------------------------------------------- + +test_that("atomic vector imap works", { + x <- 1:3 %>% set_names() + expect_true(all(imap_lgl(x, `==`, .parallel = TRUE))) + expect_length(imap_chr(x, paste, .parallel = TRUE), 3) + expect_equal(imap_int(x, ~ .x + as.integer(.y), .parallel = TRUE), x * 2) + expect_equal(imap_dbl(x, ~ .x + as.numeric(.y), .parallel = TRUE), x * 2) + expect_equal(imap_vec(x, ~ .x + as.numeric(.y), .parallel = TRUE), x * 2) +}) + +# map_at -------------------------------------------------------------------- + +test_that("map_at() works with tidyselect", { + skip_if_not_installed("tidyselect") + local_options(lifecycle_verbosity = "quiet") + + x <- list(a = "b", b = "c", aa = "bb") + one <- map_at(x, vars(a), toupper, .parallel = TRUE) + expect_identical(one$a, "B") + expect_identical(one$aa, "bb") + two <- map_at(x, vars(tidyselect::contains("a")), toupper, .parallel = TRUE) + expect_identical(two$a, "B") + expect_identical(two$aa, "BB") +}) + +test_that("negative .at omits locations", { + x <- c(1, 2, 3) + out <- map_at(x, -1, ~ .x * 2, .parallel = TRUE) + expect_equal(out, list(1, 4, 6)) +}) + +# --------------------------------------------------------------------------- + +mirai::daemons(0)