Below is the sample code that I am running. when this spark job runs, Dataframe joins are happening using sortmergejoin instead of broadcastjoin.
def joinedDf (sqlContext: SQLContext,
txnTable: DataFrame,
countriesDfBroadcast: Broadcast[DataFrame]):
DataFrame = {
txnTable.as("df1").join((countriesDfBroadcast.value).withColumnRenamed("CNTRY_ID", "DW_CNTRY_ID").as("countries"),
$"df1.USER_CNTRY_ID" === $"countries.DW_CNTRY_ID", "inner")
}
joinedDf(sqlContext, txnTable, countriesDfBroadcast).write.parquet("temp")
The broadcastjoin is not happening even when I specify a broadcast() hint in the join statement.
The optimizer is hashpartitioning the dataframe and it is causing data skew.
Has anyone seen this behavior?
I am running this on yarn using Spark 1.6 and HiveContext as SQLContext. The spark job runs on 200 executors. and the data size of the txnTable is 240GB and the datasize of countriesDf is 5mb.
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…