In your example there are a lot of nested DISTINCT operators within FOREACH which are executed in the reducer, it relies on RAM to calculate unique values and this query produces just one Job. In case of just too many unique elements in a group you could get memory related exceptions as well.
Luckily PIG Latin is a dataflow language and you write sort of execution plan. In order to utilize more CPUs you could change your code in such way that forces more MapReduce jobs which could be executed in parallel. For that we should rewrite query without using nested DISTINCT, the trick is to do distinct operations and than group by as if you had just one column and than merge the results. It is very SQL like, but it works. Here it is:
records = LOAD '....' USING PigStorage(',') AS (g, a, b, c, d, fd, s, w);
selected = FOREACH records GENERATE g, a, b, c, d;
grouped_a = FOREACH selected GENERATE g, a;
grouped_a = DISTINCT grouped_a;
grouped_a_count = GROUP grouped_a BY g;
grouped_a_count = FOREACH grouped_a_count GENERATE FLATTEN(group) as g, COUNT(grouped_a) as a_count;
grouped_b = FOREACH selected GENERATE g, b;
grouped_b = DISTINCT grouped_b;
grouped_b_count = GROUP grouped_b BY g;
grouped_b_count = FOREACH grouped_b_count GENERATE FLATTEN(group) as g, COUNT(grouped_b) as b_count;
grouped_c = FOREACH selected GENERATE g, c;
grouped_c = DISTINCT grouped_c;
grouped_c_count = GROUP grouped_c BY g;
grouped_c_count = FOREACH grouped_c_count GENERATE FLATTEN(group) as g, COUNT(grouped_c) as c_count;
grouped_d = FOREACH selected GENERATE g, d;
grouped_d = DISTINCT grouped_d;
grouped_d_count = GROUP grouped_d BY g;
grouped_d_count = FOREACH grouped_d_count GENERATE FLATTEN(group) as g, COUNT(grouped_d) as d_count;
mrg = JOIN grouped_a_count BY g, grouped_b_count BY g, grouped_c_count BY g, grouped_d_count BY g;
out = FOREACH mrg GENERATE grouped_a_count::g, grouped_a_count::a_count, grouped_b_count::b_count, grouped_c_count::c_count, grouped_d_count::d_count;
STORE out into '....' USING PigStorage(',');
After execution I got following summary which shows that distinct operations did not suffer from the skew in data were processed by the first Job:
Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs
job_201206061712_0244 669 45 75 8 13 376 18 202 grouped_a,grouped_b,grouped_c,grouped_d,records,selected DISTINCT,MULTI_QUERY
job_201206061712_0245 1 1 3 3 3 12 12 12 grouped_c_count GROUP_BY,COMBINER
job_201206061712_0246 1 1 3 3 3 12 12 12 grouped_b_count GROUP_BY,COMBINER
job_201206061712_0247 5 1 48 27 33 30 30 30 grouped_a_count GROUP_BY,COMBINER
job_201206061712_0248 1 1 3 3 3 12 12 12 grouped_d_count GROUP_BY,COMBINER
job_201206061712_0249 4 1 3 3 3 12 12 12 mrg,out HASH_JOIN ...,
Input(s):
Successfully read 52215768 records (44863559501 bytes) from: "...."
Output(s):
Successfully stored 9 records (181 bytes) in: "..."
From Job DAG we can see that groupby operations were executed in parallel:
Job DAG:
job_201206061712_0244 -> job_201206061712_0248,job_201206061712_0246,job_201206061712_0247,job_201206061712_0245,
job_201206061712_0248 -> job_201206061712_0249,
job_201206061712_0246 -> job_201206061712_0249,
job_201206061712_0247 -> job_201206061712_0249,
job_201206061712_0245 -> job_201206061712_0249,
job_201206061712_0249
It works fine on my datasets where one of the group key values (in column g) makes 95% of the data. It also gets rid of memory related exceptions.