I have a simple routine that takes the product of a vector of Double
. I am attempting to parallelize this code, but many of the sparks end up fizzling. Here is a self-contained benchmark which is also provided as a gist:
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE MagicHash #-}
{-# OPTIONS_GHC -O2 -Wall -threaded -fforce-recomp #-}
import Criterion.Main
import Control.Monad (when)
import Control.Parallel.Strategies (runEval,rpar,rseq)
import qualified Data.Vector.Primitive as PV
main :: IO ()
main = do
let expected = PV.product numbers
when (not (serialProduct numbers == expected)) $ do
fail "serialProduct implementation incorrect"
defaultMain
[ bgroup "product"
[ bench "serial" $ whnf serialProduct numbers
, bench "parallel" $ whnf parallelProduct numbers
]
]
numbers :: PV.Vector Double
numbers = PV.replicate 10000000 1.00000001
{-# NOINLINE numbers #-}
serialProduct :: PV.Vector Double -> Double
serialProduct v =
let !len = PV.length v
go :: Double -> Int -> Double
go !d !ix = if ix < len then go (d * PV.unsafeIndex v ix) (ix + 1) else d
in go 1.0 0
-- | This only works when the vector length is a multiple of 8.
parallelProduct :: PV.Vector Double -> Double
parallelProduct v = runEval $ do
let chunk = div (PV.length v) 8
p2 <- rpar (serialProduct (PV.slice (chunk * 6) chunk v))
p3 <- rpar (serialProduct (PV.slice (chunk * 7) chunk v))
p1 <- rseq (serialProduct (PV.slice (chunk * 0) (chunk * 6) v))
return (p1 * p2 * p3)
This can be built and run with:
ghc -threaded parallel_compute.hs
./parallel_compute +RTS -N4 -s
I have an eight-core box, so giving the runtime four capabilities should be fine. The benchmark results are not super important, but here they are:
benchmarking product/serial
time 11.40 ms (11.30 ms .. 11.53 ms)
0.999 R2 (0.998 R2 .. 1.000 R2)
mean 11.43 ms (11.37 ms .. 11.50 ms)
std dev 167.2 μs (120.4 μs .. 210.1 μs)
benchmarking product/parallel
time 10.03 ms (9.949 ms .. 10.15 ms)
0.999 R2 (0.999 R2 .. 1.000 R2)
mean 10.17 ms (10.11 ms .. 10.31 ms)
std dev 235.7 μs (133.4 μs .. 426.2 μs)
Now, the runtime statistics. This is where I'm confused:
124,508,840 bytes allocated in the heap
529,843,176 bytes copied during GC
80,232,008 bytes maximum residency (8344 sample(s))
901,272 bytes maximum slop
83 MB total memory in use (0 MB lost due to fragmentation)
Tot time (elapsed) Avg pause Max pause
Gen 0 19 colls, 19 par 0.008s 0.001s 0.0001s 0.0003s
Gen 1 8344 colls, 8343 par 2.916s 1.388s 0.0002s 0.0008s
Parallel GC work balance: 76.45% (serial 0%, perfect 100%)
TASKS: 13 (1 bound, 12 peak workers (12 total), using -N4)
SPARKS: 1024 (502 converted, 0 overflowed, 0 dud, 28 GC'd, 494 fizzled)
INIT time 0.000s ( 0.002s elapsed)
MUT time 11.480s ( 10.414s elapsed)
GC time 2.924s ( 1.389s elapsed)
EXIT time 0.004s ( 0.005s elapsed)
Total time 14.408s ( 11.811s elapsed)
Alloc rate 10,845,717 bytes per MUT second
Productivity 79.7% of total user, 88.2% of total elapsed
In the section that deals with sparks, we can see that about half of them fizzle. This seems unbelievable to me. In parallelProduct
, we have the main thread work on a task 6 times larger than what is given to either of the sparks. However, it seems like one of these sparks always gets fizzled (or GCed). And this isn't a small job either. We're talking about a computation that takes milliseconds, so it seems implausible that the main thread could finish it before the other thunks get sparked.
My understanding (which could be totally wrong) is that this kind of computation should be ideal for the concurrent runtime. Garbage collection seems to be the biggest problem for concurrent applications in GHC, but the task I'm doing here doesn't generate any almost garbage, since GHC turns the innards of serialProduct
into a tight loop with everything unboxed.
On the upside, we do see an 11% speedup for the parallel version in the benchmarks. So, the eighth portion of the work that was successfully sparked really did make a measurable impact. I'm just wondering why that other spark doesn't work like I expect it to.
Any help on understanding this would be appreciated.
EDIT
I've update the gist to include another implementation:
-- | This only works when the vector length is a multiple of 4.
parallelProductFork :: PV.Vector Double -> Double
parallelProductFork v = unsafePerformIO $ do
let chunk = div (PV.length v) 4
var <- newEmptyMVar
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 0) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 1) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 2) chunk v)) >>= putMVar var
_ <- forkIO $ evaluate (serialProduct (PV.slice (chunk * 3) chunk v)) >>= putMVar var
a <- takeMVar var
b <- takeMVar var
c <- takeMVar var
d <- takeMVar var
return (a * b * c * d)
This one has excellent performance:
benchmarking product/parallel mvar
time 3.814 ms (3.669 ms .. 3.946 ms)
0.986 R2 (0.977 R2 .. 0.992 R2)
mean 3.818 ms (3.708 ms .. 3.964 ms)
std dev 385.6 μs (317.1 μs .. 439.8 μs)
variance introduced by outliers: 64% (severely inflated)
But, it falls back on conventional concurrency primitives instead of using sparks. I do not like this solution, but I am providing it as evidence that it should be possible to achieve the same performance with a spark-based approach.
See Question&Answers more detail:
os