Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.7k views
in Technique[技术] by (71.8m points)

scala - How to find exact median for grouped data in Spark

I have a requirement to calculate exact median on grouped data set of Double datatype in Spark using Scala.

It is different from the similar query: Find median in spark SQL for multiple double datatype columns. This question is about the finding data for grouped data, whereas the other one is about finding median on a RDD level.

Here is my sample data

scala> sqlContext.sql("select * from test").show()

+---+---+
| id|num|
+---+---+
|  A|0.0|
|  A|1.0|
|  A|1.0|
|  A|1.0|
|  A|0.0|
|  A|1.0|
|  B|0.0|
|  B|1.0|
|  B|1.0|
+---+---+

Expected Answer:

+--------+
| Median |
+--------+
|   1    |
|   1    |
+--------+

I tried the following option, but no luck:

1) Hive function percentile, it worked only for BigInt.

2) Hive function percentile_approx, but it does not work as expected (returns 0.25 vs 1).

scala> sqlContext.sql("select percentile_approx(num, 0.5) from test group by id").show()

+----+
| _c0|
+----+
|0.25|
|0.25|
+----+
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Simplest Approach (requires Spark 2.0.1+ and not exact median)

As noted in the comments in reference to the first question Find median in Spark SQL for double datatype columns, we can use percentile_approx to calculate median for Spark 2.0.1+. To apply this for grouped data in Apache Spark, the query would look like:

val df = Seq(("A", 0.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)).toDF("id", "num")
df.createOrReplaceTempView("df")
spark.sql("select id, percentile_approx(num, 0.5) as median from df group by id order by id").show()

with the output being:

+---+------+
| id|median|
+---+------+
|  A|   1.0|
|  B|   1.0|
+---+------+

Saying this, this is an approximate value (as opposed to an exact median per the question).

Calculate exact median for grouped data

There are multiple approaches so I'm sure others in SO can provide better or more efficient examples. But here's a code snippet calculate the median for grouped data in Spark (verified in Spark 1.6 and Spark 2.1):

import org.apache.spark.SparkContext._

val rdd: RDD[(String, Double)] = sc.parallelize(Seq(("A", 1.0), ("A", 0.0), ("A", 1.0), ("A", 1.0), ("A", 0.0), ("A", 1.0), ("B", 0.0), ("B", 1.0), ("B", 1.0)))

// Scala median function
def median(inputList: List[Double]): Double = {
  val count = inputList.size
  if (count % 2 == 0) {
    val l = count / 2 - 1
    val r = l + 1
    (inputList(l) + inputList(r)).toDouble / 2
  } else
    inputList(count / 2).toDouble
}

// Sort the values
val setRDD = rdd.groupByKey()
val sortedListRDD = setRDD.mapValues(_.toList.sorted)

// Output DataFrame of id and median
sortedListRDD.map(m => {
  (m._1, median(m._2))
}).toDF("id", "median_of_num").show()

with the output being:

+---+-------------+
| id|median_of_num|
+---+-------------+
|  A|          1.0|
|  B|          1.0|
+---+-------------+

There are some caveats that I should call out as this likely isn't the most efficient implementation:

  • It's currently using a groupByKey which is not very performant. You may want to change this into a reduceByKey instead (more information at Avoid GroupByKey)
  • Using a Scala function to calculate the median.

This approach should work okay for smaller amounts of data but if you have millions of rows for each key, would advise utilizing Spark 2.0.1+ and using the percentile_approx approach.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...