Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
271 views
in Technique[技术] by (71.8m points)

spark.sql.shuffle.partitions of 200 default partitions conundrum

In many posts there is the statement - as shown below in some form or another - due to some question on shuffling, partitioning, due to JOIN, AGGR, whatever, etc.:

... In general whenever you do a spark sql aggregation or join which shuffles data this is the number of resulting partitions = 200. This is set by spark.sql.shuffle.partitions. ...

So, my question is:

  • Do we mean that if we have set partitioning at 765 for a DF, for example,
    • That the processing occurs against 765 partitions, but that the output is coalesced / re-partitioned standardly to 200 - referring here to word resulting?
    • Or does it do the processing using 200 partitions after coalescing / re-partitioning to 200 partitions before JOINing, AGGR?

I ask as I never see a clear viewpoint.

I did the following test:

// genned ad DS of some 20M short rows
df0.count
val ds1 = df0.repartition(765)
ds1.count
val ds2 = df0.repartition(765)
ds2.count

sqlContext.setConf("spark.sql.shuffle.partitions", "765")
// The above not included on 1st run, the above included on 2nd run.

ds1.rdd.partitions.size
ds2.rdd.partitions.size

val joined = ds1.join(ds2, ds1("time_asc") === ds2("time_asc"), "outer") 
joined.rdd.partitions.size
joined.count
joined.rdd.partitions.size

On the 1st test - not defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 200. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

On the 2nd test - defining sqlContext.setConf("spark.sql.shuffle.partitions", "765"), the processing and num partitions resulted was 765. Even though SO post 45704156 states it may not apply to DFs - this is a DS.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Spark.sql.shuffle.partitions is the parameter which decides the number of partitions while doing shuffles like joins or aggregation i.e where data movement is there across the nodes. The other part spark.default.parallelism will be calculated on basis of your data size and max block size, in HDFS it’s 128mb. So if your job does not do any shuffle it will consider the default parallelism value or if you are using rdd you can set it by your own. While shuffling happens it will take 200.

Val df = sc.parallelize(List(1,2,3,4,5),4).toDF() df.count() // this will use 4 partitions

Val df1 = df df1.except(df).count // will generate 200 partitions having 2 stages


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...