Skip to content

Commit b676e63

Browse files
committed
Support multithreading in groupreduce
Keep the default to a single thread until we find a reliable way of predicting a reasonably optimal number of threads.
1 parent 8ea2edf commit b676e63

File tree

4 files changed

+154
-62
lines changed

4 files changed

+154
-62
lines changed

.travis.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ arch:
1717
notifications:
1818
email: false
1919

20+
21+
env:
22+
- JULIA_NUM_THREADS=2
23+
2024
after_success:
2125
- julia -e 'using Pkg; Pkg.add("Coverage"); using Coverage;
2226
Coveralls.submit(Coveralls.process_folder())'

src/DataFrames.jl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ using Base.Sort, Base.Order, Base.Iterators
77
using TableTraits, IteratorInterfaceExtensions
88
import LinearAlgebra: norm
99
using Markdown
10+
using Base.Threads
1011

1112
import DataAPI,
1213
DataAPI.All,
@@ -87,6 +88,8 @@ else
8788
export only
8889
end
8990

91+
const NTHREADS = 1
92+
9093
include("other/utils.jl")
9194
include("other/index.jl")
9295

src/groupeddataframe/splitapplycombine.jl

Lines changed: 117 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -453,18 +453,22 @@ julia> combine(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
453453
```
454454
"""
455455
function combine(f::Base.Callable, gd::GroupedDataFrame;
456-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
456+
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
457+
nthreads::Int=NTHREADS)
457458
return combine_helper(f, gd, keepkeys=keepkeys, ungroup=ungroup,
458-
copycols=true, keeprows=false, renamecols=renamecols)
459+
copycols=true, keeprows=false, renamecols=renamecols,
460+
nthreads=nthreads)
459461
end
460462

461463
combine(f::typeof(nrow), gd::GroupedDataFrame;
462-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) =
464+
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
465+
nthreads::Int=NTHREADS) =
463466
combine(gd, [nrow => :nrow], keepkeys=keepkeys, ungroup=ungroup,
464467
renamecols=renamecols)
465468

466469
function combine(p::Pair, gd::GroupedDataFrame;
467-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true)
470+
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
471+
nthreads::Int=NTHREADS)
468472
# move handling of aggregate to specialized combine
469473
p_from, p_to = p
470474

@@ -484,20 +488,24 @@ function combine(p::Pair, gd::GroupedDataFrame;
484488
cs = p_from
485489
end
486490
return combine_helper(cs => p_to, gd, keepkeys=keepkeys, ungroup=ungroup,
487-
copycols=true, keeprows=false, renamecols=renamecols)
491+
copycols=true, keeprows=false, renamecols=renamecols,
492+
nthreads=nthreads)
488493
end
489494

490495
combine(gd::GroupedDataFrame,
491496
cs::Union{Pair, typeof(nrow), ColumnIndex, MultiColumnIndex}...;
492-
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true) =
497+
keepkeys::Bool=true, ungroup::Bool=true, renamecols::Bool=true,
498+
nthreads::Int=NTHREADS) =
493499
_combine_prepare(gd, cs..., keepkeys=keepkeys, ungroup=ungroup,
494-
copycols=true, keeprows=false, renamecols=renamecols)
500+
copycols=true, keeprows=false, renamecols=renamecols,
501+
nthreads=nthreads)
495502

496503
function _combine_prepare(gd::GroupedDataFrame,
497504
@nospecialize(cs::Union{Pair, typeof(nrow),
498505
ColumnIndex, MultiColumnIndex}...);
499506
keepkeys::Bool, ungroup::Bool, copycols::Bool,
500-
keeprows::Bool, renamecols::Bool)
507+
keeprows::Bool, renamecols::Bool,
508+
nthreads::Int)
501509
cs_vec = []
502510
for p in cs
503511
if p === nrow
@@ -570,7 +578,8 @@ function _combine_prepare(gd::GroupedDataFrame,
570578
f = Pair[first(x) => first(last(x)) for x in cs_norm]
571579
nms = Symbol[last(last(x)) for x in cs_norm]
572580
return combine_helper(f, gd, nms, keepkeys=keepkeys, ungroup=ungroup,
573-
copycols=copycols, keeprows=keeprows, renamecols=renamecols)
581+
copycols=copycols, keeprows=keeprows, renamecols=renamecols,
582+
nthreads=nthreads)
574583
end
575584

576585
function gen_groups(idx::Vector{Int})
@@ -590,11 +599,12 @@ end
590599
function combine_helper(f, gd::GroupedDataFrame,
591600
nms::Union{AbstractVector{Symbol},Nothing}=nothing;
592601
keepkeys::Bool, ungroup::Bool,
593-
copycols::Bool, keeprows::Bool, renamecols::Bool)
602+
copycols::Bool, keeprows::Bool, renamecols::Bool,
603+
nthreads::Int)
594604
if !ungroup && !keepkeys
595605
throw(ArgumentError("keepkeys=false when ungroup=false is not allowed"))
596606
end
597-
idx, valscat = _combine(f, gd, nms, copycols, keeprows, renamecols)
607+
idx, valscat = _combine(f, gd, nms, copycols, keeprows, renamecols, nthreads)
598608
!keepkeys && ungroup && return valscat
599609
keys = groupcols(gd)
600610
for key in keys
@@ -985,24 +995,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
985995
end
986996

987997
function groupreduce!(res::AbstractVector, f, op, condf, adjust, checkempty::Bool,
988-
incol::AbstractVector, gd::GroupedDataFrame)
998+
incol::AbstractVector, gd::GroupedDataFrame; nthreads::Int)
989999
n = length(gd)
1000+
groups = gd.groups
9901001
if adjust !== nothing || checkempty
9911002
counts = zeros(Int, n)
9921003
end
993-
groups = gd.groups
994-
@inbounds for i in eachindex(incol, groups)
995-
gix = groups[i]
996-
x = incol[i]
997-
if gix > 0 && (condf === nothing || condf(x))
998-
# this check should be optimized out if U is not Any
999-
if eltype(res) === Any && !isassigned(res, gix)
1000-
res[gix] = f(x, gix)
1001-
else
1002-
res[gix] = op(res[gix], f(x, gix))
1004+
nt = min(nthreads, Threads.nthreads())
1005+
if nt <= 1 || axes(incol) != axes(groups)
1006+
@inbounds for i in eachindex(incol, groups)
1007+
gix = groups[i]
1008+
x = incol[i]
1009+
if gix > 0 && (condf === nothing || condf(x))
1010+
# this check should be optimized out if U is not Any
1011+
if eltype(res) === Any && !isassigned(res, gix)
1012+
res[gix] = f(x, gix)
1013+
else
1014+
res[gix] = op(res[gix], f(x, gix))
1015+
end
1016+
if adjust !== nothing || checkempty
1017+
counts[gix] += 1
1018+
end
10031019
end
1020+
end
1021+
else
1022+
res_vec = Vector{typeof(res)}(undef, nt)
1023+
# needs to be always allocated to fix type instability with @threads
1024+
counts_vec = Vector{Vector{Int}}(undef, nt)
1025+
res_vec[1] = res
1026+
if adjust !== nothing || checkempty
1027+
counts_vec[1] = counts
1028+
end
1029+
for i in 2:nt
1030+
res_vec[i] = copy(res)
10041031
if adjust !== nothing || checkempty
1005-
counts[gix] += 1
1032+
counts_vec[i] = zeros(Int, n)
1033+
end
1034+
end
1035+
Threads.@threads for tid in 1:nt
1036+
res′ = res_vec[tid]
1037+
if adjust !== nothing || checkempty
1038+
counts′ = counts_vec[tid]
1039+
end
1040+
start = 1 + ((tid - 1) * length(groups)) ÷ nt
1041+
stop = (tid * length(groups)) ÷ nt
1042+
@inbounds for i in start:stop
1043+
gix = groups[i]
1044+
x = incol[i]
1045+
if gix > 0 && (condf === nothing || condf(x))
1046+
# this check should be optimized out if U is not Any
1047+
if eltype(res′) === Any && !isassigned(res′, gix)
1048+
res′[gix] = f(x, gix)
1049+
else
1050+
res′[gix] = op(res′[gix], f(x, gix))
1051+
end
1052+
if adjust !== nothing || checkempty
1053+
counts′[gix] += 1
1054+
end
1055+
end
1056+
end
1057+
end
1058+
for i in 2:length(res_vec)
1059+
res .= op.(res, res_vec[i])
1060+
end
1061+
if adjust !== nothing || checkempty
1062+
for i in 2:length(counts_vec)
1063+
counts .+= counts_vec[i]
10061064
end
10071065
end
10081066
end
@@ -1042,26 +1100,31 @@ end
10421100

10431101
# function barrier works around type instability of groupreduce_init due to applicable
10441102
groupreduce(f, op, condf, adjust, checkempty::Bool,
1045-
incol::AbstractVector, gd::GroupedDataFrame) =
1103+
incol::AbstractVector, gd::GroupedDataFrame;
1104+
nthreads::Int) =
10461105
groupreduce!(groupreduce_init(op, condf, adjust, incol, gd),
1047-
f, op, condf, adjust, checkempty, incol, gd)
1106+
f, op, condf, adjust, checkempty, incol, gd, nthreads=nthreads)
10481107
# Avoids the overhead due to Missing when computing reduction
10491108
groupreduce(f, op, condf::typeof(!ismissing), adjust, checkempty::Bool,
1050-
incol::AbstractVector, gd::GroupedDataFrame) =
1109+
incol::AbstractVector, gd::GroupedDataFrame;
1110+
nthreads::Int) =
10511111
groupreduce!(disallowmissing(groupreduce_init(op, condf, adjust, incol, gd)),
1052-
f, op, condf, adjust, checkempty, incol, gd)
1112+
f, op, condf, adjust, checkempty, incol, gd, nthreads=nthreads)
10531113

1054-
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame) =
1055-
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd)
1114+
(r::Reduce)(incol::AbstractVector, gd::GroupedDataFrame; nthreads::Int=NTHREADS) =
1115+
groupreduce((x, i) -> x, r.op, r.condf, r.adjust, r.checkempty, incol, gd,
1116+
nthreads=nthreads)
10561117

10571118
# this definition is missing in Julia 1.0 LTS and is required by aggregation for var
10581119
# TODO: remove this when we drop 1.0 support
10591120
if VERSION < v"1.1"
10601121
Base.zero(::Type{Missing}) = missing
10611122
end
10621123

1063-
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame)
1064-
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd)
1124+
function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFrame;
1125+
nthreads::Int=NTHREADS)
1126+
means = groupreduce((x, i) -> x, Base.add_sum, agg.condf, /, false, incol, gd,
1127+
nthreads=nthreads)
10651128
# !ismissing check is purely an optimization to avoid a copy later
10661129
if eltype(means) >: Missing && agg.condf !== !ismissing
10671130
T = Union{Missing, real(eltype(means))}
@@ -1071,10 +1134,11 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
10711134
res = zeros(T, length(gd))
10721135
return groupreduce!(res, (x, i) -> @inbounds(abs2(x - means[i])), +, agg.condf,
10731136
(x, l) -> l <= 1 ? oftype(x / (l-1), NaN) : x / (l-1),
1074-
false, incol, gd)
1137+
false, incol, gd, nthreads=nthreads)
10751138
end
10761139

1077-
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame)
1140+
function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFrame;
1141+
nthreads::Int=NTHREADS)
10781142
outcol = Aggregate(var, agg.condf)(incol, gd)
10791143
if eltype(outcol) <: Union{Missing, Rational}
10801144
return sqrt.(outcol)
@@ -1083,20 +1147,25 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra
10831147
end
10841148
end
10851149

1086-
for f in (first, last)
1087-
function (agg::Aggregate{typeof(f)})(incol::AbstractVector, gd::GroupedDataFrame)
1088-
n = length(gd)
1089-
outcol = similar(incol, n)
1090-
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
1091-
if isconcretetype(eltype(outcol))
1092-
return outcol
1093-
else
1094-
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
1150+
for f in (:first, :last)
1151+
# Without using @eval the presence of a keyword argument triggers a Julia bug
1152+
@eval begin
1153+
function (agg::Aggregate{typeof($f)})(incol::AbstractVector, gd::GroupedDataFrame;
1154+
nthreads::Int=NTHREADS)
1155+
n = length(gd)
1156+
outcol = similar(incol, n)
1157+
fillfirst!(agg.condf, outcol, incol, gd, rev=agg.f === last)
1158+
if isconcretetype(eltype(outcol))
1159+
return outcol
1160+
else
1161+
return copyto_widen!(Tables.allocatecolumn(typeof(first(outcol)), n), outcol)
1162+
end
10951163
end
10961164
end
10971165
end
10981166

1099-
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame)
1167+
function (agg::Aggregate{typeof(length)})(incol::AbstractVector, gd::GroupedDataFrame;
1168+
nthreads::Int=NTHREADS)
11001169
if getfield(gd, :idx) === nothing
11011170
lens = zeros(Int, length(gd))
11021171
@inbounds for gix in gd.groups
@@ -1143,7 +1212,7 @@ end
11431212

11441213
function _combine(f::AbstractVector{<:Pair},
11451214
gd::GroupedDataFrame, nms::AbstractVector{Symbol},
1146-
copycols::Bool, keeprows::Bool, renamecols::Bool)
1215+
copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int)
11471216
# here f should be normalized and in a form of source_cols => fun
11481217
@assert all(x -> first(x) isa Union{Int, AbstractVector{Int}, AsTable}, f)
11491218
@assert all(x -> last(x) isa Base.Callable, f)
@@ -1185,7 +1254,7 @@ function _combine(f::AbstractVector{<:Pair},
11851254
if length(gd) > 0 && isagg(p, gd)
11861255
incol = parentdf[!, source_cols]
11871256
agg = check_aggregate(last(p), incol)
1188-
outcol = agg(incol, gd)
1257+
outcol = agg(incol, gd, nthreads=nthreads)
11891258
res[i] = idx_agg, outcol
11901259
elseif keeprows && fun === identity && !(source_cols isa AsTable)
11911260
@assert source_cols isa Union{Int, AbstractVector{Int}}
@@ -1283,7 +1352,7 @@ function _combine(f::AbstractVector{<:Pair},
12831352
end
12841353

12851354
function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing,
1286-
copycols::Bool, keeprows::Bool, renamecols::Bool)
1355+
copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int)
12871356
@assert copycols && !keeprows
12881357
# use `similar` as `gd` might have been subsetted
12891358
firstres = length(gd) > 0 ? fun(gd[1]) : fun(similar(parent(gd), 0))
@@ -1293,7 +1362,7 @@ function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing,
12931362
end
12941363

12951364
function _combine(p::Pair, gd::GroupedDataFrame, ::Nothing,
1296-
copycols::Bool, keeprows::Bool, renamecols::Bool)
1365+
copycols::Bool, keeprows::Bool, renamecols::Bool, nthreads::Int)
12971366
# here p should not be normalized as we allow tabular return value from fun
12981367
# map and combine should not dispatch here if p is isagg
12991368
@assert copycols && !keeprows
@@ -1708,7 +1777,7 @@ julia> select(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
17081777
```
17091778
"""
17101779
select(gd::GroupedDataFrame, args...; copycols::Bool=true, keepkeys::Bool=true,
1711-
ungroup::Bool=true, renamecols::Bool=true) =
1780+
ungroup::Bool=true, renamecols::Bool=true, nthreads::Int=NTHREADS) =
17121781
_combine_prepare(gd, args..., copycols=copycols, keepkeys=keepkeys,
17131782
ungroup=ungroup, keeprows=true, renamecols=renamecols)
17141783

0 commit comments

Comments
 (0)