Skip to content

Commit 1933482

Browse files
committed
Support multithreading in groupreduce
Keep the default to a single thread until we find a reliable way of predicting a reasonably optimal number of threads.
1 parent 8645651 commit 1933482

File tree

7 files changed

+301
-130
lines changed

7 files changed

+301
-130
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ jobs:
3535
- uses: actions/cache@v1
3636
env:
3737
cache-name: cache-artifacts
38+
JULIA_NUM_THREADS: 2
3839
with:
3940
path: ~/.julia/artifacts
4041
key: ${{ runner.os }}-test-${{ env.cache-name }}-${{ hashFiles('**/Project.toml') }}

NEWS.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# DataFrames v1.0 Release Notes
2+
3+
## New functionalities
4+
5+
* `combine`, `select` and `transform` with `GroupedDataFrame` now accept
6+
a `nthreads` argument which enables multithreading for some optimized
7+
grouped reductions ([#2491](https://github.com/JuliaData/DataFrames.jl/pull/2491)).
8+
9+
110
# DataFrames v0.22 Release Notes
211

312
## Breaking changes

src/DataFrames.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module DataFrames
33
using Statistics, Printf, REPL
44
using Reexport, SortingAlgorithms, Compat, Unicode, PooledArrays, CategoricalArrays
55
@reexport using Missings, InvertedIndices
6-
using Base.Sort, Base.Order, Base.Iterators
6+
using Base.Sort, Base.Order, Base.Iterators, Base.Threads
77
using TableTraits, IteratorInterfaceExtensions
88
import LinearAlgebra: norm
99
using Markdown

src/abstractdataframe/selection.jl

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -644,9 +644,10 @@ end
644644
select(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
645645
select(args::Callable, df::DataFrame; renamecols::Bool=true)
646646
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
647-
ungroup::Bool=true, renamecols::Bool=true)
647+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Integer=1)
648648
select(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
649-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
649+
keepkeys::Bool=true, ungroup::Bool=true,
650+
renamecols::Bool=true, nthreads::Integer=1)
650651
651652
Create a new data frame that contains columns from `df` or `gd` specified by
652653
`args` and return it. The result is guaranteed to have the same number of rows
@@ -664,6 +665,9 @@ $TRANSFORMATION_COMMON_RULES
664665
data frame.
665666
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
666667
frame or a `GroupedDataFrame`.
668+
- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1
669+
currently has an effect only for some optimized grouped reductions. Values higher than
670+
`Threads.nthreads()` will be replaced with that value.
667671
668672
# Examples
669673
```jldoctest
@@ -858,9 +862,11 @@ end
858862
transform(df::AbstractDataFrame, args...; copycols::Bool=true, renamecols::Bool=true)
859863
transform(f::Callable, df::DataFrame; renamecols::Bool=true)
860864
transform(gd::GroupedDataFrame, args...; copycols::Bool=true,
861-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
865+
keepkeys::Bool=true, ungroup::Bool=true,
866+
renamecols::Bool=true, nthreads::Integer=1)
862867
transform(f::Base.Callable, gd::GroupedDataFrame; copycols::Bool=true,
863-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
868+
keepkeys::Bool=true, ungroup::Bool=true,
869+
renamecols::Bool=true, nthreads::Integer=1)
864870
865871
Create a new data frame that contains columns from `df` or `gd` plus columns
866872
specified by `args` and return it. The result is guaranteed to have the same
@@ -877,6 +883,9 @@ $TRANSFORMATION_COMMON_RULES
877883
data frame.
878884
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
879885
frame or a `GroupedDataFrame`.
886+
- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1
887+
currently has an effect only for some optimized grouped reductions. Values higher than
888+
`Threads.nthreads()` will be replaced with that value.
880889
881890
Note that when the first argument is a `GroupedDataFrame`, `keepkeys=false`
882891
is needed to be able to return a different value for the grouping column:
@@ -924,9 +933,11 @@ end
924933
combine(df::AbstractDataFrame, args...; renamecols::Bool=true)
925934
combine(f::Callable, df::AbstractDataFrame; renamecols::Bool=true)
926935
combine(gd::GroupedDataFrame, args...;
927-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
936+
keepkeys::Bool=true, ungroup::Bool=true,
937+
renamecols::Bool=true, nthreads::Integer=1)
928938
combine(f::Base.Callable, gd::GroupedDataFrame;
929-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
939+
keepkeys::Bool=true, ungroup::Bool=true,
940+
renamecols::Bool=true, nthreads::Integer=1)
930941
931942
Create a new data frame that contains columns from `df` or `gd` specified by
932943
`args` and return it. The result can have any number of rows that is determined
@@ -941,6 +952,9 @@ $TRANSFORMATION_COMMON_RULES
941952
data frame.
942953
- `ungroup::Bool=true` : whether the return value of the operation on `gd` should be a data
943954
frame or a `GroupedDataFrame`.
955+
- `nthreads::Integer=1` : the number of CPU threads to use. Passing a value higher than 1
956+
currently has an effect only for some optimized grouped reductions. Values higher than
957+
`Threads.nthreads()` will be replaced with that value.
944958
945959
# Examples
946960
```jldoctest

src/groupeddataframe/fastaggregates.jl

Lines changed: 92 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -157,24 +157,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
157157
end
158158

159159
function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
160-
incol::AbstractVector, gd::GroupedDataFrame)
160+
incol::AbstractVector, gd::GroupedDataFrame, nthreads::Integer)
161161
n = length(gd)
162+
groups = gd.groups
162163
if adjust !== nothing || checkempty
163164
counts = zeros(Int, n)
164165
end
165-
groups = gd.groups
166-
@inbounds for i in eachindex(incol, groups)
167-
gix = groups[i]
168-
x = incol[i]
169-
if gix > 0 && (condf === nothing || condf(x))
170-
# this check should be optimized out if U is not Any
171-
if eltype(res) === Any && !isassigned(res, gix)
172-
res[gix] = f(x, gix)
173-
else
174-
res[gix] = op(res[gix], f(x, gix))
166+
nt = min(nthreads, Threads.nthreads())
167+
if nt <= 1 || axes(incol) != axes(groups)
168+
@inbounds for i in eachindex(incol, groups)
169+
gix = groups[i]
170+
x = incol[i]
171+
if gix > 0 && (condf === nothing || condf(x))
172+
# this check should be optimized out if U is not Any
173+
if eltype(res) === Any && !isassigned(res, gix)
174+
res[gix] = f(x, gix)
175+
else
176+
res[gix] = op(res[gix], f(x, gix))
177+
end
178+
if adjust !== nothing || checkempty
179+
counts[gix] += 1
180+
end
175181
end
182+
end
183+
else
184+
res_vec = Vector{typeof(res)}(undef, nt)
185+
# needs to be always allocated to fix type instability with @threads
186+
counts_vec = Vector{Vector{Int}}(undef, nt)
187+
res_vec[1] = res
188+
if adjust !== nothing || checkempty
189+
counts_vec[1] = counts
190+
end
191+
for i in 2:nt
192+
res_vec[i] = copy(res)
193+
if adjust !== nothing || checkempty
194+
counts_vec[i] = zeros(Int, n)
195+
end
196+
end
197+
Threads.@threads for tid in 1:nt
198+
res′ = res_vec[tid]
176199
if adjust !== nothing || checkempty
177-
counts[gix] += 1
200+
counts′ = counts_vec[tid]
201+
end
202+
start = 1 + ((tid - 1) * length(groups)) ÷ nt
203+
stop = (tid * length(groups)) ÷ nt
204+
@inbounds for i in start:stop
205+
gix = groups[i]
206+
x = incol[i]
207+
if gix > 0 && (condf === nothing || condf(x))
208+
# this check should be optimized out if U is not Any
209+
if eltype(res′) === Any && !isassigned(res′, gix)
210+
res′[gix] = f(x, gix)
211+
else
212+
res′[gix] = op(res′[gix], f(x, gix))
213+
end
214+
if adjust !== nothing || checkempty
215+
counts′[gix] += 1
216+
end
217+
end
218+
end
219+
end
220+
for i in 2:length(res_vec)
221+
res .= op.(res, res_vec[i])
222+
end
223+
if adjust !== nothing || checkempty
224+
for i in 2:length(counts_vec)
225+
counts .+= counts_vec[i]
178226
end
179227
end
180228
end
@@ -218,26 +266,31 @@ end
218266

219267
# function barrier works around type instability of groupreduce_init due to applicable
220268
groupreduce(f, op, condf, adjust, checkempty::Bool,
221-
incol::AbstractVector, gd::GroupedDataFrame) =
269+
incol::AbstractVector, gd::GroupedDataFrame,
270+
nthreads::Integer) =
222271
groupreduce!(groupreduce_init(op, condf, adjust, incol, gd),
223-
f, op, condf, adjust, checkempty, incol, gd)
272+
f, op, condf, adjust, checkempty, incol, gd, nthreads)
224273
# Avoids the overhead due to Missing when computing reduction
225274
groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
226-
incol::AbstractVector, gd::GroupedDataFrame) =
275+
incol::AbstractVector, gd::GroupedDataFrame,
276+
nthreads::Integer) =
227277
groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)),
228-
f, op, condf, adjust, checkempty, incol, gd)
278+
f, op, condf, adjust, checkempty, incol, gd, nthreads)
229279

230-
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) =
231-
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd)
280+
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame;
281+
nthreads::Integer=1) =
282+
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd, nthreads)
232283

233284
# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
234285
# TODO: remove this when we drop 1.0 support
235286
if VERSION < v"1.1"
236287
Base.zero(::Type{Missing}) = missing
237288
end
238289

239-
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame)
240-
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd)
290+
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame;
291+
nthreads::Integer=1)
292+
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false,
293+
incol, gd, nthreads)
241294
# !ismissing check is purely an optimization to avoid a copy later
242295
if eltype(means) >: Missing && agg.condf !== !ismissing
243296
T = Union{Missing, real(eltype(means))}
@@ -247,32 +300,38 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
247300
res = zeros(T, length(gd))
248301
return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
249302
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
250-
false, incol, gd)
303+
false, incol, gd, nthreads)
251304
end
252305

253-
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame)
254-
outcol = Aggregate(var, agg.condf)(incol, gd)
306+
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame;
307+
nthreads::Integer=1)
308+
outcol = Aggregate(var, agg.condf)(incol, gd; nthreads=nthreads)
255309
if eltype(outcol) <: Union{Missing, Rational}
256310
return sqrt.(outcol)
257311
else
258312
return map!(sqrt, outcol, outcol)
259313
end
260314
end
261315

262-
for f in (first, last)
263-
function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame)
264-
n = length(gd)
265-
outcol = similar(incol, n)
266-
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
267-
if isconcretetype(eltype(outcol))
268-
return outcol
269-
else
270-
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
316+
for f in (:first, :last)
317+
# Without using @eval the presence of a keyword argument triggers a Julia bug
318+
@eval begin
319+
function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame;
320+
nthreads::Integer=1)
321+
n = length(gd)
322+
outcol = similar(incol, n)
323+
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
324+
if isconcretetype(eltype(outcol))
325+
return outcol
326+
else
327+
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
328+
end
271329
end
272330
end
273331
end
274332

275-
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame)
333+
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame;
334+
nthreads::Integer=1)
276335
if getfield(gd, :idx) === nothing
277336
lens = zeros(Int, length(gd))
278337
@inbounds for gix in gd.groups

0 commit comments

Comments
 (0)