Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.5k views
in Technique[技术] by (71.8m points)

scala - Process all columns / the entire row in a Spark UDF

For a dataframe containing a mix of string and numeric datatypes, the goal is to create a new features column that is a minhash of all of them.

While this could be done by performing a dataframe.toRDD it is expensive to do that when the next step will be to simply convert the RDD back to a dataframe.

So is there a way to do a udf along the following lines:

val wholeRowUdf = udf( (row: Row) =>  computeHash(row))

Row is not a spark sql datatype of course - so this would not work as shown.

Update/clarifiction I realize it is easy to create a full-row UDF that runs inside withColumn. What is not so clear is what can be used inside a spark sql statement:

val featurizedDf = spark.sql("select wholeRowUdf( what goes here? ) as features 
                              from mytable")
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Row is not a spark sql datatype of course - so this would not work as shown.

I am going to show that you can use Row to pass all the columns or selected columns to a udf function using struct inbuilt function

First I define a dataframe

val df = Seq(
  ("a", "b", "c"),
  ("a1", "b1", "c1")
).toDF("col1", "col2", "col3")
//    +----+----+----+
//    |col1|col2|col3|
//    +----+----+----+
//    |a   |b   |c   |
//    |a1  |b1  |c1  |
//    +----+----+----+

Then I define a function to make all the elements in a row as one string separated by , (as you have computeHash function)

import org.apache.spark.sql.Row
def concatFunc(row: Row) = row.mkString(", ")

Then I use it in udf function

import org.apache.spark.sql.functions._
def combineUdf = udf((row: Row) => concatFunc(row))

Finally I call the udf function using withColumn function and struct inbuilt function combining selected columns as one column and pass to the udf function

df.withColumn("contcatenated", combineUdf(struct(col("col1"), col("col2"), col("col3")))).show(false)
//    +----+----+----+-------------+
//    |col1|col2|col3|contcatenated|
//    +----+----+----+-------------+
//    |a   |b   |c   |a, b, c      |
//    |a1  |b1  |c1  |a1, b1, c1   |
//    +----+----+----+-------------+

So you can see that Row can be used to pass whole row as an argument

You can even pass all columns in a row at once

val columns = df.columns
df.withColumn("contcatenated", combineUdf(struct(columns.map(col): _*)))

Updated

You can achieve the same with sql queries too, you just need to register the udf function as

df.createOrReplaceTempView("tempview")
sqlContext.udf.register("combineUdf", combineUdf)
sqlContext.sql("select *, combineUdf(struct(`col1`, `col2`, `col3`)) as concatenated from tempview")

It will give you the same result as above

Now if you don't want to hardcode the names of columns then you can select the column names according to your desire and make it a string

val columns = df.columns.map(x => "`"+x+"`").mkString(",")
sqlContext.sql(s"select *, combineUdf(struct(${columns})) as concatenated from tempview")

I hope the answer is helpful


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...