Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
718 views
in Technique[技术] by (71.8m points)

apache spark - How to avoid shuffles while joining DataFrames on unique keys?

I have two DataFrames A and B:

  • A has columns (id, info1, info2) with about 200 Million rows
  • B only has the column id with 1 million rows

The id column is unique in both DataFrames.

I want a new DataFrame which filters A to only include values from B.

if B was very small I know I would something along the lines of

A.filter($("id") isin B("id"))

but B is still pretty large, so not all of it can fit as a broadcast variable.

and I know I could use

A.join(B, Seq("id"))

but that wouldn't harness the uniqueness and I'm afraid will cause unnecessary shuffles.

What is the optimal method to achieve that task?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

If you have not applied any partitioner on Dataframe A, May be this will help you understanding Join And Shuffle concepts.

Without Partitioner :

A.join(B, Seq("id"))

By default, this operation will hash all the keys of both dataframes, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine. Here you have to notice that both dataframes shuffle across the network. enter image description here

With HashPartitioner: Call partitionBy() when building A Dataframe, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call A.join(B, Seq("id")), Spark will shuffle only the B RDD. Since B has less data than A you don't need to apply partitioner on B

ex:

 val A = sc.sequenceFile[id, info1, info2]("hdfs://...")
     .partitionBy(new HashPartitioner(100)) // Create 100 partitions
     .persist()
 A.join(B, Seq("id"))

enter image description here

Reference is from Learning Spark book.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...