Include graphframes
(the latest supported Spark version is 2.1, but it should support 2.2 as well, if you use newer you'll have to build your own with 2.3 patch) replacing XXX
with Spark version and YYY
with Scala version:
spark.jars.packages graphframes:graphframes:0.5.0-sparkXXX-s_YYY
Add explode keys:
import org.apache.spark.sql.functions._
val df = Seq(
(Seq("k1", "k2"), "v1"), (Seq("k2"), "v2"),
(Seq("k3", "k2"), "v3"), (Seq("k4"), "v4")
).toDF("key", "value")
val edges = df.select(
explode($"key") as "src", $"value" as "dst")
Convert to graphframe
:
import org.graphframes._
val gf = GraphFrame.fromEdges(edges)
Set checkpoint directory (if not set):
import org.apache.spark.sql.SparkSession
val path: String = ???
val spark: SparkSession = ???
spark.sparkContext.setCheckpointDir(path)
Find connected components:
val components = GraphFrame.fromEdges(edges).connectedComponents.setAlgorithm("graphx").run
Join result with input data:
val result = components.where($"id".startsWith("v")).toDF("value", "group").join(df, Seq("value"))
Check result:
result.show
// +-----+------------+--------+
// |value| group| key|
// +-----+------------+--------+
// | v3|489626271744|[k3, k2]|
// | v2|489626271744| [k2]|
// | v4|532575944704| [k4]|
// | v1|489626271744|[k1, k2]|
// +-----+------------+--------+
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…