Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized map using mirai #1163

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
b2e3ad4
mirai map concept
shikokuchuo Jan 16, 2025
8d99e46
enable parallel for other map variants
shikokuchuo Jan 21, 2025
12c9ec9
implement parallel map2
shikokuchuo Jan 21, 2025
afb58fb
implement parallel pmap
shikokuchuo Jan 21, 2025
325eae1
rely on updated mirai for multiple map name preservation
shikokuchuo Jan 22, 2025
52b1b68
re-export mirai::daemons()
shikokuchuo Jan 22, 2025
e34b6dc
add parallelization docs
shikokuchuo Jan 22, 2025
1fd6a15
add parallel option for map_at()
shikokuchuo Jan 22, 2025
8273941
re-throw errors from mirai_map(); add tests
shikokuchuo Jan 22, 2025
44298f4
add parallelized examples to map(); document with() method for daemons
shikokuchuo Jan 22, 2025
0219bd8
update pkgdown template
shikokuchuo Jan 22, 2025
617b565
add news item
shikokuchuo Jan 22, 2025
1fa33c2
remove mentions of dispatcher to reduce cognitive burden
shikokuchuo Jan 23, 2025
0e08a8d
send mirai cancellation on interrupt
shikokuchuo Jan 23, 2025
765a661
simplify wording in docs
shikokuchuo Jan 23, 2025
91d73ad
chained cli errors now identical to non-parallel case
shikokuchuo Jan 23, 2025
84d6b63
mention usual to set daemons once per session
shikokuchuo Jan 23, 2025
d348652
move mmap_() to parallelization.R
shikokuchuo Jan 23, 2025
a3121a2
upload a few missing tests
shikokuchuo Jan 23, 2025
f349d71
remove option to supply a list to '.parallel', rely on carrier::crate…
shikokuchuo Jan 23, 2025
db60ece
error if ... is used with .parallel = TRUE
shikokuchuo Jan 23, 2025
9a337c3
correct map example
shikokuchuo Jan 24, 2025
60dadd1
re-export crate(); update docs
shikokuchuo Jan 24, 2025
fda5abb
refactor mmap_ to split out with_parallel_indexed_errors
shikokuchuo Jan 24, 2025
b5824c1
add point about when parallelization would be beneficial
shikokuchuo Jan 24, 2025
6858a20
update map examples
shikokuchuo Jan 24, 2025
93eaa2c
implement auto-crating
shikokuchuo Jan 24, 2025
6707b06
simplify formatting auto-crating message
shikokuchuo Jan 27, 2025
01f9198
Apply suggestions from code review
shikokuchuo Jan 27, 2025
300c16e
further wording comments from Hadley
shikokuchuo Jan 27, 2025
e433035
move mirai and carrier to suggests for now
shikokuchuo Jan 27, 2025
e8e1b91
re-draft crating docs in light of auto-crating
shikokuchuo Jan 27, 2025
ac32e66
make it easier to install mirai and carrier dependencies
shikokuchuo Jan 27, 2025
304af59
use rlang::check_installed()
shikokuchuo Jan 27, 2025
ea5fc35
add point about not making daemons() calls within a package
shikokuchuo Jan 27, 2025
61f0b4f
make parallel examples conditional on carrier as well
shikokuchuo Jan 27, 2025
cdc93bf
test conditionally
shikokuchuo Jan 27, 2025
4b4066a
cache package check results
shikokuchuo Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Imports:
cli (>= 3.6.1),
lifecycle (>= 1.0.3),
magrittr (>= 1.5.0),
mirai (>= 2.0.1.9005),
rlang (>= 1.1.1),
vctrs (>= 0.6.3)
Suggests:
Expand All @@ -42,3 +43,4 @@ Config/testthat/parallel: TRUE
Encoding: UTF-8
Roxygen: list(markdown = TRUE)
RoxygenNote: 7.3.2
Remotes: shikokuchuo/mirai
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ export(cross3)
export(cross_d)
export(cross_df)
export(cross_n)
export(daemons)
export(detect)
export(detect_index)
export(discard)
Expand Down Expand Up @@ -205,6 +206,7 @@ import(vctrs)
importFrom(cli,cli_progress_bar)
importFrom(lifecycle,deprecated)
importFrom(magrittr,"%>%")
importFrom(mirai,daemons)
importFrom(rlang,"%||%")
importFrom(rlang,done)
importFrom(rlang,exec)
Expand Down
9 changes: 8 additions & 1 deletion NEWS.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
# purrr (development version)

* purrr gains the capability for parallel and distributed map, powered by the
mirai package. The argument `.parallel` has been added to `map()`, `map2()`,
`pmap()` and variants to enable this. See `?parallelization` for more details
(@shikokuchuo, #1163).

* Added a test to assert that `list_transpose()` does not work on data frames
(@KimLopezGuell, #1141, #1149).
* Added `imap_vec()` (#1084)

* Added `imap_vec()` (#1084).

* `list_transpose()` inspects all elements to determine the correct
template if it's not provided by the user (#1128, @krlmlr).

Expand Down
4 changes: 2 additions & 2 deletions R/map-if-at.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ map_if <- function(.x, .p, .f, ..., .else = NULL) {
#' installed, you can use `vars()` and tidyselect helpers to select
#' elements.
#' @export
map_at <- function(.x, .at, .f, ..., .progress = FALSE) {
map_at <- function(.x, .at, .f, ..., .parallel = FALSE, .progress = FALSE) {
where <- where_at(.x, .at, user_env = caller_env())

out <- vector("list", length(.x))
out[where] <- map(.x[where], .f, ..., .progress = .progress)
out[where] <- map(.x[where], .f, ..., .parallel = .parallel, .progress = .progress)
out[!where] <- .x[!where]

set_names(out, names(.x))
Expand Down
98 changes: 81 additions & 17 deletions R/map.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
#' This makes it easier to understand which arguments belong to which
#' function and will tend to yield better error messages.
#'
#' @param .parallel `r lifecycle::badge("experimental")` Whether to map in
#' parallel. Use `TRUE` (or a named list if supplying additional objects) to
#' parallelize using the \CRANpkg{mirai} package. Set up how and where to
#' parallelize beforehand using [`daemons()`][mirai::daemons]. See
#' [parallelization] for more details.
#' @param .progress Whether to show a progress bar. Use `TRUE` to turn on
#' a basic progress bar, use a string to give it a name, or see
#' [progress_bars] for more details.
Expand Down Expand Up @@ -125,50 +130,70 @@
#' map(\(df) lm(mpg ~ wt, data = df)) |>
#' map(summary) |>
#' map_dbl("r.squared")
map <- function(.x, .f, ..., .progress = FALSE) {
map_("list", .x, .f, ..., .progress = .progress)
#'
#' @examplesIf interactive()
#' # run in interactive sessions only as spawns additional processes
#'
#' # To use parallelized map, set daemons (number of parallel processes) first:
#' daemons(2)
shikokuchuo marked this conversation as resolved.
Show resolved Hide resolved
#' mtcars |> map_dbl(sum, .parallel = TRUE)
#' daemons(0)
#'
#' # Or wrap a statement in with() to evaluate it with the specified daemons:
#' with(daemons(2), {
#' 1:10 |>
#' map(rnorm, n = 10, .parallel = TRUE) |>
#' map_dbl(mean, .parallel = TRUE)
#' })
#'
map <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) {
map_("list", .x, .f, ..., .parallel = .parallel, .progress = .progress)
}

#' @rdname map
#' @export
map_lgl <- function(.x, .f, ..., .progress = FALSE) {
map_("logical", .x, .f, ..., .progress = .progress)
map_lgl <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) {
map_("logical", .x, .f, ..., .parallel = .parallel, .progress = .progress)
}

#' @rdname map
#' @export
map_int <- function(.x, .f, ..., .progress = FALSE) {
map_("integer", .x, .f, ..., .progress = .progress)
map_int <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) {
map_("integer", .x, .f, ..., .parallel = .parallel, .progress = .progress)
}

#' @rdname map
#' @export
map_dbl <- function(.x, .f, ..., .progress = FALSE) {
map_("double", .x, .f, ..., .progress = .progress)
map_dbl <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) {
map_("double", .x, .f, ..., .parallel = .parallel, .progress = .progress)
}

#' @rdname map
#' @export
map_chr <- function(.x, .f, ..., .progress = FALSE) {
map_chr <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) {
local_deprecation_user_env()
map_("character", .x, .f, ..., .progress = .progress)
map_("character", .x, .f, ..., .parallel = .parallel, .progress = .progress)
}

map_ <- function(.type,
.x,
.f,
...,
.parallel = FALSE,
.progress = FALSE,
.purrr_user_env = caller_env(2),
.purrr_error_call = caller_env()) {
.x <- vctrs_vec_compat(.x, .purrr_user_env)
vec_assert(.x, arg = ".x", call = .purrr_error_call)

n <- vec_size(.x)
.f <- as_mapper(.f, ...)

names <- vec_names(.x)
if (!isFALSE(.parallel)) {
return(mmap_(.x, .f, list(...), .parallel, .progress, .type, .purrr_error_call))
}

.f <- as_mapper(.f, ...)
n <- vec_size(.x)
names <- vec_names(.x)

i <- 0L
with_indexed_errors(
Expand All @@ -179,21 +204,60 @@
)
}

mmap_ <- function(.x, .f, .args, .parallel, .progress, .type, error_call) {
shikokuchuo marked this conversation as resolved.
Show resolved Hide resolved

m <- if (isTRUE(.parallel)) {
mirai::mirai_map(.x, .f, .args = .args)
} else if (is.list(.parallel)) {
mirai::mirai_map(.x, .f, as.environment(.parallel), .args = .args)
} else {
cli::cli_abort(
"'.parallel' must be TRUE/FALSE or a list, not a {.obj_type_friendly {(.parallel)}}.",
call = error_call
)
}

options <- c(".stop", if (isTRUE(.progress)) ".progress")
x <- withCallingHandlers(
mirai::collect_mirai(m, options = options),
error = function(cnd) {
location <- cnd$location
iname <- cnd$name
cli::cli_abort(
c(i = "In index: {location}.",
i = if (length(iname) && nzchar(iname)) "With name: {iname}."),
location = location,
name = iname,
parent = cnd$parent,
call = error_call,
class = "purrr_error_indexed"
)
},
interrupt = function(cnd) {
mirai::stop_mirai(m)

Check warning on line 237 in R/map.R

View check run for this annotation

Codecov / codecov/patch

R/map.R#L237

Added line #L237 was not covered by tests
}
)
if (.type != "list") {
x <- simplify_impl(x, ptype = vector(mode = .type), error_call = error_call)
}
x

}

#' @rdname map
#' @param .ptype If `NULL`, the default, the output type is the common type
#' of the elements of the result. Otherwise, supply a "prototype" giving
#' the desired type of output.
#' @export
map_vec <- function(.x, .f, ..., .ptype = NULL, .progress = FALSE) {
out <- map(.x, .f, ..., .progress = .progress)
map_vec <- function(.x, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) {
out <- map(.x, .f, ..., .parallel = .parallel, .progress = .progress)
simplify_impl(out, ptype = .ptype)
}

#' @rdname map
#' @export
walk <- function(.x, .f, ..., .progress = FALSE) {
map(.x, .f, ..., .progress = .progress)
walk <- function(.x, .f, ..., .parallel = FALSE, .progress = FALSE) {
map(.x, .f, ..., .parallel = .parallel, .progress = .progress)
invisible(.x)
}

Expand Down
37 changes: 23 additions & 14 deletions R/map2.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,36 @@
#' by_cyl <- mtcars |> split(mtcars$cyl)
#' mods <- by_cyl |> map(\(df) lm(mpg ~ wt, data = df))
#' map2(mods, by_cyl, predict)
map2 <- function(.x, .y, .f, ..., .progress = FALSE) {
map2_("list", .x, .y, .f, ..., .progress = .progress)
map2 <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) {
map2_("list", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
}
#' @export
#' @rdname map2
map2_lgl <- function(.x, .y, .f, ..., .progress = FALSE) {
map2_("logical", .x, .y, .f, ..., .progress = .progress)
map2_lgl <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) {
map2_("logical", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
}
#' @export
#' @rdname map2
map2_int <- function(.x, .y, .f, ..., .progress = FALSE) {
map2_("integer", .x, .y, .f, ..., .progress = .progress)
map2_int <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) {
map2_("integer", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
}
#' @export
#' @rdname map2
map2_dbl <- function(.x, .y, .f, ..., .progress = FALSE) {
map2_("double", .x, .y, .f, ..., .progress = .progress)
map2_dbl <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) {
map2_("double", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
}
#' @export
#' @rdname map2
map2_chr <- function(.x, .y, .f, ..., .progress = FALSE) {
map2_("character", .x, .y, .f, ..., .progress = .progress)
map2_chr <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) {
map2_("character", .x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
}

map2_ <- function(.type,
.x,
.y,
.f,
...,
.parallel = FALSE,
.progress = FALSE,
.purrr_user_env = caller_env(2),
.purrr_error_call = caller_env()) {
Expand All @@ -74,6 +75,14 @@ map2_ <- function(.type,

.f <- as_mapper(.f, ...)

if (!isFALSE(.parallel)) {
attributes(args) <- list(
class = "data.frame",
row.names = if (is.null(names)) .set_row_names(n) else names
)
return(mmap_(args, .f, list(...), .parallel, .progress, .type, .purrr_error_call))
}

i <- 0L
with_indexed_errors(
i = i,
Expand All @@ -85,14 +94,14 @@ map2_ <- function(.type,

#' @rdname map2
#' @export
map2_vec <- function(.x, .y, .f, ..., .ptype = NULL, .progress = FALSE) {
out <- map2(.x, .y, .f, ..., .progress = .progress)
map2_vec <- function(.x, .y, .f, ..., .ptype = NULL, .parallel = FALSE, .progress = FALSE) {
out <- map2(.x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
simplify_impl(out, ptype = .ptype)
}

#' @export
#' @rdname map2
walk2 <- function(.x, .y, .f, ..., .progress = FALSE) {
map2(.x, .y, .f, ..., .progress = .progress)
walk2 <- function(.x, .y, .f, ..., .parallel = FALSE, .progress = FALSE) {
map2(.x, .y, .f, ..., .parallel = .parallel, .progress = .progress)
invisible(.x)
}
4 changes: 4 additions & 0 deletions R/package-purrr.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
#' @import vctrs
#' @importFrom cli cli_progress_bar
#' @importFrom lifecycle deprecated
#' @importFrom mirai daemons
#' @useDynLib purrr, .registration = TRUE
"_PACKAGE"

the <- new_environment()

#' @export
mirai::daemons
Loading
Loading