Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelized map using mirai #1163

Open
wants to merge 38 commits into
base: main
Choose a base branch
from
Open

Conversation

shikokuchuo
Copy link
Contributor

@shikokuchuo shikokuchuo commented Jan 22, 2025

@hadley this builds on the proof of concept you reviewed last week at shikokuchuo#1.

Adds a .parallel argument to every function that currently accepts a .progress argument, i.e.:

  • map() and variants
  • map2() and variants
  • pmap() and variants

There is a full test suite added at test-parallel.R, which runs parallel versions of all the tests from the map/map2/pmap test files, so I'm fairly confident that the behaviour is consistent.

This version relies on current dev mirai. I made certain upstream changes to align, in particular, the error messages so that they are consistent and identical to the normal map cases. I'm ready to make another mirai release once this PR is merged.

I've added a couple of simple examples at the bottom of map() so it's hopefully clear to users what they should be doing.

The full documentation is available as an Rd page at ?parallelization. I've added it this way, as opposed to a vignette, to mirror how .progress is documented. It also has the advantage of linking easily from function documentation. This contains:

  • An explanation of when to provide a list to .parallel
  • How to set daemons()
  • The with() method for daemons

Conceptually, this has been mostly an interface-mapping exercise, with purrr directly using the public exported functions mirai_map() and collect_mirai() from the mirai package. I've focused almost completely on consistency with the normal, non-parallel, versions thus far.

I've resisted the urge to do any specific optimization for efficiency, as it's likely premature to do so. Just noting here that this is likely to yield results in future.

Welcome your review!

cc. @jcheng5 FYI.

closes #1162.

Copy link
Member

@hadley hadley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking great. Thanks so much for working on this!

R/map.R Outdated Show resolved Hide resolved
R/map.R Outdated Show resolved Hide resolved
R/parallelization.R Outdated Show resolved Hide resolved
R/parallelization.R Show resolved Hide resolved
Copy link
Member

@lionel- lionel- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see progress on this front!

I'm not familiar with mirai so I mainly have a high level comment about potentially using carrier for the treatment of functions.

R/parallelization.R Outdated Show resolved Hide resolved
R/parallelization.R Outdated Show resolved Hide resolved
@@ -0,0 +1,104 @@
#' Parallelization in purrr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need some documentation about what happens if you do nested map(.parallel = TRUE) calls, i.e. what happens in that case and what are the best practices.

In particular, if I want the outer nest to send elements to remote workers, but then I want the inner nest to run in parallel on those workers, how do I achieve that? That was a common furrr question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a look at the furrr docs to refresh my understanding of why it's an issue there. From what I remember it's the way that chunking is done + the lack of queuing in the old parallel backend. mirai should be completely flexible in just working in this respect. But I'll add something to the docs on this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see anything specific at furrr, but I did read the nested topologies at https://future.futureverse.org/index.html?q=nested#nested-futures-and-evaluation-topologies. In conclusion, I think this is specific to the design of future and it's syntax that requires you to define 2 layers up front.

mirai can spawn other mirai and so on, and isn't limited by the initial daemons() setting. So you can easily have daemons launched over the network, and then each of those launch local daemons for example.

For mirai/purrr it works just like the below. The only thing you really need to remember is to set daemons within the function. I haven't added anything specifically to the docs as I really want to keep that focused on (i) daemons and (ii) crating.

I'd be happy to add to a vignette down the line along with whatever other issues people start reporting.

purrr::daemons(3)
#> [1] 3
f <- function(x) {purrr::daemons(2); purrr::map(1:2, function(x) Sys.getpid(), .parallel = TRUE)}
purrr::map(1:3, f, .parallel = TRUE)
#> [[1]]
#> [[1]][[1]]
#> [1] 68371
#> 
#> [[1]][[2]]
#> [1] 68366
#> 
#> 
#> [[2]]
#> [[2]][[1]]
#> [1] 68295
#> 
#> [[2]][[2]]
#> [1] 68297
#> 
#> 
#> [[3]]
#> [[3]][[1]]
#> [1] 68331
#> 
#> [[3]][[2]]
#> [1] 68334

Created on 2025-01-24 with reprex v2.1.1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the primary concern is that if you accidentally use parallel = TRUE with a function that already uses parallel = TRUE, you can end up with a multiplicative explosion of processes and this sucks up a bunch of system memory etc.

I just did mirai::daemons(1e4) and hard crashed my computer 😬

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, so I think that means we're protected as just using parallel = TRUE never creates any daemons, so you can't 'accidentally' do this. If we supply a function that itself contains a map(parallel = TRUE), that would simply error in the daemon.

Note my example above only works as we explicitly call daemons(n) as part of the function. This is also a reason why we require the user to explicitly supply this number rather than leave it to a function that e.g. automatically detects the number of cores. There is also long-standing guidance for package authors to leave daemons settings to the user.

Btw. mirai::daemons(1e4) is just a simple OOM. It will handle that many connections i.e. over the network, but trying to launch that many R processes on one machine is just too much...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a problem right now, but we need something to protect you calling purrr::map(1:100, pkg::foo, .parallel = TRUE) where unbeknownst to you, pkg:foo() is creating 10 daemons.

mirai::daemons(1e4) felt more than a regular OOM problem because I got the spinning beach ball of death.

Copy link
Contributor Author

@shikokuchuo shikokuchuo Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a problem right now, but we need something to protect you calling purrr::map(1:100, pkg::foo, .parallel = TRUE) where unbeknownst to you, pkg:foo() is creating 10 daemons.

I think this is where we bring in mirai's concept of 'compute profiles'. But that creates a bit of complication - I'm thinking get users familiar with parallel purrr first, and then bring in more features.

mirai::daemons(1e4) felt more than a regular OOM problem because I got the spinning beach ball of death.

I see, sorry! I'm on Ubuntu so I have an OOM killer by default - I wouldn't know what it is on a mac (I'm guessing).

Copy link
Contributor Author

@shikokuchuo shikokuchuo Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a problem right now, but we need something to protect you calling purrr::map(1:100, pkg::foo, .parallel = TRUE) where unbeknownst to you, pkg:foo() is creating 10 daemons.

Btw. your hypothetical would only work if the daemons were being launched from different machines - so it wouldn't cause a situation where you end up with 1,000 daemons on the same machine.

If you try to revise daemons() settings on the same machine without an explicit reset, you'd just error on a subsequent attempt - so I think there's already protection built in.

Don't worry I see it doesn't quite apply to your example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're good for now if we tell people not to set daemons in a package, but it's something we'll need to come back to again in the future.

R/parallelization.R Outdated Show resolved Hide resolved
R/parallelization.R Outdated Show resolved Hide resolved
Comment on lines 60 to 63
#' * `n`: the number of daemons to launch on your local machine, e.g.
#' `daemons(7)`. As a rule of thumb, for maximum efficiency this should be (at
#' most) one less than the number of cores on your machine, leaving one core
#' for the main R process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like Henrik would kill me if we didn't at least consider mentioning parallelly::availableCores() as a way to figure out the number of cores on your machine. Anything we can do to help prevent people from maxing out their shared HPC workers is probably a good thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is indeed the reason why the number of daemons is user-supplied. It's not clear that any automated function is going to get this right, precisely as we can't know what else is going on in the compute environment at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, you might want to reinforce that you should never call daemons() inside of a package.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I'm fine to do that in more places within mirai docs. Let me know if you wanted some wording in purrr itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, you might want to reinforce that you should never call daemons() inside of a package.

I've added a point in ea5fc35 to this section of the docs.

R/parallelization.R Outdated Show resolved Hide resolved
R/parallelization.R Outdated Show resolved Hide resolved
Copy link
Member

@lionel- lionel- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to avoid implicit deletion of the function environment as I think it is surprising behaviour.

To that end when a non-package function is supplied, we could:

  • Fail and recommend calling crate()
  • Auto-crate and print a message about it (so it's not implicit) with the crate size (suggested by @hadley on Slack).

Regarding auto-crating, it possibly should be restricted to the case of calling from the global env as determined by identical(topenv(caller_env()), globalenv()). In that case it would be regarded as a script user convenience whereas package authors would have to explicitly crate.

Should we set the default argument of .parallel to TRUE when the input is a crate?

@DavisVaughan
Copy link
Member

Should we set the default argument of .parallel to TRUE when the input is a crate

I think that would be a bit too magical. I would want to see an explicit parallel = TRUE call in my code when I come back to it in 6 months

@shikokuchuo
Copy link
Contributor Author

To that end when a non-package function is supplied, we could:
* Auto-crate and print a message about it (so it's not implicit) with the crate size (suggested by @hadley on Slack).

Auto-crating for non-package functions is now implemented in 93eaa2c. @hadley I believe this now rounds out the interface you had in mind:

  • error on no daemons set
  • auto-crating
  • no use of ...

Regarding auto-crating, it possibly should be restricted to the case of calling from the global env as determined by identical(topenv(caller_env()), globalenv()). In that case it would be regarded as a script user convenience whereas package authors would have to explicitly crate.

I think it could be confusing having different behaviours here depending on whether it's called from a package.

More importantly, if the goal is to encourage package authors to use crate(), then I think this is achieved by the current behaviour, as auto-crating doesn't happen when a crate is passed to .f. So they would avoid this by explicitly crating themselves - I think that makes sense.

Copy link
Member

@hadley hadley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a great first pass at this feature, and I think we're on track to merge it after this last round of (mostly minor) feedback.

DESCRIPTION Outdated Show resolved Hide resolved
R/map.R Outdated Show resolved Hide resolved
R/map.R Outdated Show resolved Hide resolved
R/map.R Outdated Show resolved Hide resolved
@@ -0,0 +1,104 @@
#' Parallelization in purrr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the primary concern is that if you accidentally use parallel = TRUE with a function that already uses parallel = TRUE, you can end up with a multiplicative explosion of processes and this sucks up a bunch of system memory etc.

I just did mirai::daemons(1e4) and hard crashed my computer 😬

R/parallelization.R Show resolved Hide resolved
R/parallelization.R Outdated Show resolved Hide resolved
R/parallelization.R Outdated Show resolved Hide resolved
R/map.R Outdated
}

ensure_parallel_dependencies <- function(error_call = NULL) {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the a reason to not use rlang::check_installed() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm good point, let me switch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 304af59. Saves a lot of boilerplate!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to still cache the result in 4b4066a as rlang::check_installed() is rather expensive.


# ---------------------------------------------------------------------------

mirai::daemons(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be necessary, right? (Because of the on.exit() call above. Or is just just belt-and-braces for someone running the file line-by-line?

Copy link
Contributor Author

@shikokuchuo shikokuchuo Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to be extra careful. It's always safe to call daemons(0) - it's a no-op if no daemons are set.

@shikokuchuo
Copy link
Contributor Author

shikokuchuo commented Jan 27, 2025

@hadley I've been through your review comments. Notably, I've moved mirai and carrier to suggests in e433035.

I've also re-worded the 'crating' section of the parallelization docs in light of auto-crating.

There are further mirai features that purrr can take advantage of down the line, but I think it makes for a good starting point what we have now, without over-burdening users.

@shikokuchuo
Copy link
Contributor Author

Final changes my side, I'll make a cut of mirai once this is merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Parallelized Map
4 participants