Last active
May 23, 2023 15:41
-
-
Save ClaytonJY/1d31cac5bfa35d934f9abbe000d1eb18 to your computer and use it in GitHub Desktop.
Different ways to parallelize a grouped dplyr operation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(dplyr) | |
library(purrr) | |
# helper to make examples | |
new_tbl <- function(n_groups, row_range = 5:10) { | |
1:n_groups %>% | |
map_df(~tibble( | |
group_id = .x, | |
value = rnorm(sample.int(row_range, 1L)) | |
)) | |
} | |
# take the row with biggest value in some column | |
get_max_slicer <- function(col, time) { | |
col <- enquo(col) | |
function(tbl) { | |
Sys.sleep(time) | |
tbl %>% | |
arrange(desc(!!col)) %>% | |
slice(1L) | |
} | |
} | |
#### functions #### | |
# sequential | |
do_sequential <- function(tbl, func) { | |
tbl %>% | |
group_by(group_id) %>% | |
do(func(.)) | |
} | |
# naive-but-easy parallel | |
library(future) | |
plan(multiprocess) | |
do_small_groups <- function(tbl, func) { | |
tbl %>% | |
split(.$group_id) %>% | |
map(~future( | |
func(.x) | |
)) %>% | |
values() %>% | |
bind_rows() | |
} | |
# smarter, probably | |
do_big_groups <- function(tbl, func, cores = availableCores()) { | |
tbl %>% | |
split(.$group_id %% cores) %>% # relies on group_id being contiguous ints | |
map(~future( | |
do_sequential(.x, func) | |
)) %>% | |
values() %>% | |
bind_rows() | |
} | |
#### validate #### | |
tbl <- new_tbl(10L) | |
slicer <- get_max_slicer(value, 0.1) | |
funcs <- lst( | |
do_sequential, | |
do_small_groups, | |
do_big_groups | |
) | |
results <- map(funcs, ~.x(tbl, slicer)) | |
# compare each result to the one before | |
map2_lgl( | |
results[-1], results[-length(results)], | |
all_equal | |
) %>% | |
all() %>% | |
stopifnot() | |
#### benchmarking #### | |
library(microbenchmark) | |
library(tidyr) | |
library(ggplot2) | |
timer <- function(groups, time, functions, repeats = 3L) { | |
tbl <- new_tbl(groups) | |
f <- get_max_slicer(value, time) | |
functions %>% | |
map_dfr( | |
~microbenchmark(.x(tbl, f), times = repeats, unit = "s") %>% summary(), | |
.id = "func" | |
) %>% | |
select(-expr) | |
} | |
results <- list( | |
iter_time = c(0.01, 0.1, 1), | |
n_groups = c(10L, 20L, 40L) | |
) %>% | |
cross_df() %>% | |
mutate(timings = map2(n_groups, iter_time, timer, funcs)) %>% | |
unnest(timings) | |
plot_times <- function(tbl) { | |
tbl %>% | |
ggplot(aes(x = n_groups, y = mean, color = func)) + | |
facet_wrap(~iter_time) + | |
scale_x_continuous(breaks = unique(tbl$n_groups)) + # must be a simpler way? | |
geom_line() + geom_point() | |
} | |
results %>% | |
mutate(func = factor(func, names(funcs))) %>% | |
plot_times() | |
# more time, parallel options only | |
p_results <- list( | |
iter_time = c(0.05, 1, 2), | |
n_groups = c(10L, 100L, 1000L) | |
) %>% | |
cross_df() %>% | |
mutate(timings = map2(n_groups, iter_time, timer, funcs[-1])) %>% | |
unnest(timings) | |
p_results %>% | |
mutate(func = factor(func, names(funcs))) %>% | |
plot_times() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Results on 4-core i5:
Sequential can outperform many-batch parallelism, but only on super low group-counts and per-batch-processing-time; this is because it takes time to dispatch each child worker. Big buckets consistently perform better, and scale better due to no increase in communication/dispatch cost.
Further proof big batches are better, though the benefit of decreased communication costs is quickly overwhelmed by larger per-group times. Going to 2s per group would make both approaches even more similar, but scaling up to 10k groups would increase the lead of the batch-of-batches approach.