I am working on a Scala (2.11) / Spark (1.6.1) streaming project and using mapWithState()
to keep track of seen data from previous batches.
The state is distributed in 20 partitions on multiple nodes, created with StateSpec.function(trackStateFunc _).numPartitions(20)
. In this state we have only a few keys (~100) mapped to Sets
with up ~160.000 entries, which grow throughout the application. The entire state is up to 3GB
, which can be handled by each node in the cluster. In each batch, some data is added to a state but not deleted until the very end of the process, i.e. ~15 minutes.
While following the application UI, every 10th batch's processing time is very high compared to the other batches. See images:
The yellow fields represent the high processing time.
A more detailed Job view shows that in these batches occur at a certain point, exactly when all 20 partitions are "skipped". Or this is what the UI says.
My understanding of skipped
is that each state partition is one possible task which isn't executed, as it doesn't need to be recomputed. However, I don't understand why the amount of skips
varies in each Job and why the last Job requires so much processing. The higher processing time occurs regardless of the state's size, it just impacts the duration.
Is this a bug in the mapWithState()
functionality or is this intended behaviour? Does the underlying data structure require some kind of reshuffling, does the Set
in the state need to copy data? Or is it more likely to be a flaw in my application?
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…