I spent some time trying to make this work (specially how deserialize the data properly but it looks like you already cover this) ... UPDATED
//Define function to convert from GenericRecord to Row
def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
import scala.collection.JavaConversions._
for (field <- record.getSchema.getFields) {
objectArray(field.pos) = record.get(field.pos)
}
new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
}
//Inside your stream foreachRDD
val yourGenericRecordRDD = ...
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))
var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])
As you see, I am using a SchemaConverter to get the dataframe structure from the schema that you used to deserialize (this could be more painful with schema registry). For this you need the following dependency
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>3.2.0</version>
</dependency>
you will need to change your spark version depending on yours.
UPDATE: the code above only works for flat avro schemas.
For nested structures I used something different. You can copy the class SchemaConverters, it has to be inside of com.databricks.spark.avro
(it uses some protected classes from the databricks package) or you can try to use the spark-bigquery dependency. The class will not be accessible by default, so you will need to create a class inside a package com.databricks.spark.avro
to access the factory method.
package com.databricks.spark.avro
import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
class SchemaConverterUtils {
def converterSql(schema : Schema, sqlType : StructType) = {
createConverterToSQL(schema, sqlType)
}
}
After that you should be able to convert the data like
val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
///
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
...
val rowRdd = genericRecordRDD.flatMap(record => {
Try(converter(record).asInstanceOf[Row]).toOption
})
//To DataFrame
val df = sqlContext.createDataFrame(rowRdd, sqlType)
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…