Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
973 views
in Technique[技术] by (71.8m points)

java - Apache Spark insert multiple rows into the DataFrame

First of all I'm bound to the Java 1.7 and Java Spark 1.6

I have a lot of columns and data but let's follow the simple example.
So let's say I have the simple table (DataFrame)

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|   2|      B|
+----+-------+
|   3|      C|
+----+-------+

Every time on the each cell I'm calling custom udf function for the calculations that are needed. One of the requirements is to create and append the new N rows every time after each row (or after the each row that has some kind of the value).

So, it's like:

+----+-------+
|  id|   name|
+----+-------+
|   1|      A| --> create 1 new Row (based on the udf calculations)
+----+-------+
|   2|      B| --> create 2 new Rows (based on the udf calculations)
+----+-------+
|   3|      C|
+----+-------+

Expected result is:

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|    |  (new)|
+----+-------+
|   2|      B|
+----+-------+
|    |  (new)|
+----+-------+
|    |  (new)|
+----+-------+
|   3|      C|
+----+-------+

My missunderstaning - what is the best/correct way to do it?
The current issues that I faced: approach via dataFrame.foreach(new Function1<Row, BoxedUnit>() {...}) <-- not a functional interface; no java8; have to implement the whole interface; code structure is complicated - always receiving Serializable error.
Personally I am not sure that foreach is the best way to do it but I have somehow to iterate over the current dataFrame.

Also if I got it right I will have always to apply unionAll for appending new rows.
Maybe there any other better ways to do this through the Spark Sql or transforming this into RDD etc.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Answering my own question (thanks @mck for the idea with the explode())

So, let's say the initial df is:

DataFrame baseDf = ...

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|   2|      B|
+----+-------+

Create new 'temp' coulmn for the UDF result and save into new separate df:

DataFrame df1 = dataFrame.withColumn("temp")

+----+-------+-----+
|  id|   name| temp|
+----+-------+-----+
|   1|      A|     |
+----+-------+-----+
|   2|      B|     |
+----+-------+-----+

From UDF return a list (or a map):

+----+-------+------+
|  id|   name|  temp|
+----+-------+------+
|   1|      A| [C,D]|
+----+-------+------+
|   2|      B| [E,F]|
+----+-------+------+

Apply explode() on the temp column and move it to the new dataframe:

DataFrame unfolded = df1.select(functions.col("id"), functions.explode(new Column("temp")).as("name"))

+----+-------+
|  id|   name|
+----+-------+
|   1|      C|
+----+-------+
|   1|      D|
+----+-------+
|   2|      E|
+----+-------+
|   2|      F|
+----+-------+

Now, since the structure of the unfolded and baseDf is the same we can apply unionAll and then sort or filter as we need:

baseDf = baseDf.unionAll(unfolded).sort("id", "name"):

+----+-------+
|  id|   name|
+----+-------+
|   1|      A|
+----+-------+
|   1|      C|
+----+-------+
|   1|      D|
+----+-------+
|   2|      B|
+----+-------+
|   2|      E|
+----+-------+
|   2|      F|
+----+-------+

New fields were added.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...