I have a question about configuring Map/Side inner join for multiple mappers in Hadoop.
Suppose I have two very large data sets A and B, I use the same partition and sort algorithm to split them into smaller parts. For A, assume I have a(1) to a(10), and for B I have b(1) to b(10). It is assured that a(1) and b(1) contain the same keys, a(2) and b(2) have the same keys, and so on. I would like to setup 10 mappers, specifically, mapper(1) to mapper(10). To my understanding, Map/Side join is a pre-processing task prior to the mapper, therefore, I would like to join a(1) and b(1) for mapper(1), to join a(2) and b(2) for mapper(2), and so on.
After reading some reference materials, it is still not clear to me how to configure these ten mappers. I understand that using CompositeInputFormat I would be able to join two files, but it seems only configuring one mapper and joining the 20 files pair after pair (in 10 sequential tasks). How to configure all these ten mappers and join ten pairs at the same time in a genuine Map/Reduce (10 tasks in parallel)? To my understanding, ten mappers would require ten CompositeInputFormat settings because the files to join are all different. I strongly believe this is practical and doable, but I couldn't figure out what exact commands I should use.
Any hint and suggestion is highly welcome and appreciated.
Shi
Thanks a lot for the replies, David and Thomas!
I appreciate your emphasis about the pre-requirements on Map-side Join. Yes, I am aware about the sort, API, etc. After reading your comments, I think my actual problem is what is the correct expression for joining multiple splits of two files in CompositeInputFormat. For example, I have dataA and dataB sorted and reduced in 2 files respectively:
/A/dataA-r-00000
/A/dataA-r-00001
/B/dataB-r-00000
/B/dataB-r-00001
The expression command I am using now is:
inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/A/dataA-r-00000"),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,"/B/dataB-r-00000"))
It works but as you mentioned, it only starts two mappers (because the inner join prevents from splitting) and could be very inefficient if the files are big. If I want to use more mappers (say another 2 mappers to join dataA-r-00001 and dataB-r-00001), how should I construct the expression, is it something like:
String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00000'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00000'), tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/dataA-r-00001'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/dataB-r-00001'))" ;
But I think that could be mistaken, because the command above actually perform inner join of four files (which will result in nothing in my case because file *r-00000 and *r-00001 have non-overlapping keys).
Or I could just use the two dirs as inputs, like:
String joinexpression = "inner(tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/A/'),tbl(org.apache.hadoop.mapred.KeyValueTextInputFormat,'/B/'))" ;
The inner join will match the pairs automatically according to the file endings, say "00000" to "00000", "00001" to "00001"? I am stuck at this point because I need to construct the expression and pass it to
conf.set("mapred.join.expr", joinexpression);
So in one word, how should I build the proper expression if I want to use more mappers to join multiple pairs of files simultaneously?
See Question&Answers more detail:
os