I have a Spark 2.0 dataframe example
with the following structure:
id, hour, count
id1, 0, 12
id1, 1, 55
..
id1, 23, 44
id2, 0, 12
id2, 1, 89
..
id2, 23, 34
etc.
It contains 24 entries for each id (one for each hour of the day) and is ordered by id, hour using the orderBy function.
I have created an Aggregator groupConcat
:
def groupConcat(separator: String, columnToConcat: Int) = new Aggregator[Row, String, String] with Serializable {
override def zero: String = ""
override def reduce(b: String, a: Row) = b + separator + a.get(columnToConcat)
override def merge(b1: String, b2: String) = b1 + b2
override def finish(b: String) = b.substring(1)
override def bufferEncoder: Encoder[String] = Encoders.STRING
override def outputEncoder: Encoder[String] = Encoders.STRING
}.toColumn
It helps me concatenate columns into strings to obtain this final dataframe:
id, hourly_count
id1, 12:55:..:44
id2, 12:89:..:34
etc.
My question is, if I do example.orderBy($"id",$"hour").groupBy("id").agg(groupConcat(":",2) as "hourly_count")
, does that guarantee that the hourly counts will be ordered correctly in their respective buckets?
I read that this is not necessarily the case for RDDs (see Spark sort by key and then group by to get ordered iterable?), but maybe it's different for DataFrames ?
If not, how can I work around it ?
See Question&Answers more detail:
os