Answering my own question (thanks @mck for the idea with the explode()
)
So, let's say the initial df is:
DataFrame baseDf = ...
+----+-------+
| id| name|
+----+-------+
| 1| A|
+----+-------+
| 2| B|
+----+-------+
Create new 'temp' coulmn for the UDF result and save into new separate df:
DataFrame df1 = dataFrame.withColumn("temp")
+----+-------+-----+
| id| name| temp|
+----+-------+-----+
| 1| A| |
+----+-------+-----+
| 2| B| |
+----+-------+-----+
From UDF return a list (or a map):
+----+-------+------+
| id| name| temp|
+----+-------+------+
| 1| A| [C,D]|
+----+-------+------+
| 2| B| [E,F]|
+----+-------+------+
Apply explode()
on the temp column and move it to the new dataframe:
DataFrame unfolded = df1.select(functions.col("id"), functions.explode(new Column("temp")).as("name"))
+----+-------+
| id| name|
+----+-------+
| 1| C|
+----+-------+
| 1| D|
+----+-------+
| 2| E|
+----+-------+
| 2| F|
+----+-------+
Now, since the structure of the unfolded
and baseDf
is the same we can apply unionAll
and then sort or filter as we need:
baseDf = baseDf.unionAll(unfolded).sort("id", "name"):
+----+-------+
| id| name|
+----+-------+
| 1| A|
+----+-------+
| 1| C|
+----+-------+
| 1| D|
+----+-------+
| 2| B|
+----+-------+
| 2| E|
+----+-------+
| 2| F|
+----+-------+
New fields were added.
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…