Skip to content

Commit

Permalink
Use wkspawn from ConcurrentUtilities instead of Threads.spawn (#469)
Browse files Browse the repository at this point in the history
Should fix #336.

For more context, see the [same
fix](JuliaData/CSV.jl@077e177)
we made for this in CSV.jl.

Basically, objects interpolated into or returned from spawned tasks can
be unexpectedly kept alive long after the task has finished and the
object should have been garbage-collected due to individual threads
holding the most recent task as a reference. Using `@wkspawn` ensures
tasks themselves don't hold references to any of these once they're done
executing.
  • Loading branch information
quinnj authored Jun 14, 2023
1 parent fc7cc2d commit f8d2203
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/append.jl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ function append(
# start message writing from channel
threaded = ntasks > 1
tsk =
threaded ? (Threads.@spawn for msg in msgs
threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, sch, alignment)
Expand All @@ -191,7 +191,7 @@ function append(
end

if threaded
Threads.@spawn process_partition(
@wkspawn process_partition(
tbl_cols,
dictencodings,
largelists,
Expand Down
4 changes: 2 additions & 2 deletions src/table.jl
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
dictencoded = Dict{Int64,Meta.Field}() # dictionary id => field
sync = OrderedSynchronizer()
tsks = Channel{Any}(Inf)
tsk = Threads.@spawn begin
tsk = @wkspawn begin
i = 1
for cols in tsks
if i == 1
Expand Down Expand Up @@ -522,7 +522,7 @@ function Table(blobs::Vector{ArrowBlob}; convert::Bool=true)
elseif header isa Meta.RecordBatch
anyrecordbatches = true
@debugv 1 "parsing record batch message: compression = $(header.compression)"
Threads.@spawn begin
@wkspawn begin
cols = collect(VectorIterator(sch, $batch, dictencodings, convert))
put!(() -> put!(tsks, cols), sync, $(rbi))
end
Expand Down
4 changes: 2 additions & 2 deletions src/write.jl
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ function Base.open(
# start message writing from channel
threaded = Threads.nthreads() > 1
task =
threaded ? (Threads.@spawn for msg in msgs
threaded ? (@wkspawn for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
end) : (@async for msg in msgs
Base.write(io, msg, blocks, schema, alignment)
Expand Down Expand Up @@ -296,7 +296,7 @@ function write(writer::Writer, source)
put!(writer.msgs, recbatchmsg)
else
if writer.threaded
Threads.@spawn process_partition(
@wkspawn process_partition(
tblcols,
writer.dictencodings,
writer.largelists,
Expand Down

0 comments on commit f8d2203

Please sign in to comment.