在您的示例中不起作用的一件事是,所有线程都可能同时(通过 push!)对相同的向量(result和a)进行了变异,从而允许发生竞争条件。
解决这个问题的一种方法是拥有一个向量集合(每个线程一个)。每个线程仅修改自己的向量(由标识threadid())。
使用这种技术,您的示例的简化版本可能如下所示:
# The function we want to apply to each element
julia> f(x) = 2x+1
f (generic function with 1 method)
# Two collections of vectors (one vector for each thread)
# that will hold the results for each thread
julia> results = [Float64[] for _ in 1:Threads.nthreads()];
julia> as = [Float64[] for _ in 1:Threads.nthreads()]
8-element Vector{Vector{Float64}}:
[]
[]
[]
[]
[]
[]
[]
[]
julia> Threads.@threads for a in 1:10
result = f(a)
# Each thread only ever mutates its own result vector:
# results[Threads.threadid()]
push!(results[Threads.threadid()], result)
push!(as[Threads.threadid()], a)
end
请注意,您将获得结果的集合,该结果由产生结果的线程的ID索引。
# Now you get a collection of results, indexed by the id of the thread which produced them
julia> results
8-element Vector{Vector{Float64}}:
[3.0, 5.0] # These results have been produced by thread #1
[7.0, 9.0]
[11.0]
[13.0]
[15.0]
[17.0]
[19.0]
[21.0]
julia> as
8-element Vector{Vector{Float64}}:
[1.0, 2.0]
[3.0, 4.0]
[5.0]
[6.0]
[7.0]
[8.0]
[9.0]
[10.0]
最后,因此,您需要以某种方式连接或展平所有结果向量,以便将所有线程特定的结果合并为一个。一种方法是连接所有结果(这将分配一个新的大向量来保存所有结果):
julia> reduce(vcat, results)
10-element Vector{Float64}:
3.0
5.0
7.0
9.0
11.0
13.0
15.0
17.0
19.0
21.0
julia> reduce(vcat, as)
10-element Vector{Float64}:
1.0
2.0
3.0
4.0
5.0
6.0
7.0
8.0
9.0
10.0
另一种方法是直接迭代嵌套的结果,将它们动态地展平(以免分配两倍的内存来以平面方式存储它们):
julia> using Base.Iterators: flatten
julia> for r in flatten(results)
println(r)
end
3.0
5.0
7.0
9.0
11.0
13.0
15.0
17.0
19.0
21.0
julia> for (a, r) in zip(flatten(as), flatten(results))
println("$a -> $r")
end
1.0 -> 3.0
2.0 -> 5.0
3.0 -> 7.0
4.0 -> 9.0
5.0 -> 11.0
6.0 -> 13.0
7.0 -> 15.0
8.0 -> 17.0
9.0 -> 19.0
10.0 -> 21.0
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…