UDAF 强类型自定义函数
udaf强类型自定义函数需要继承Aggregator抽象类,并实现6个方法
//输入类型
case class People(name:String,age:Long)
//缓存数据类型
case class AgeBuffer(var sum:Long,var count:Long)
//输入类型,缓存数据类型,返回精英
class myAvg extends Aggregator[People,AgeBuffer,Double]{
//缓存数据初始化
override def zero: AgeBuffer = {AgeBuffer(0L,0L)}
//分区内数据进行聚合
override def reduce(b: AgeBuffer, a: People): AgeBuffer = {
b.sum+=a.age
b.count+=1
b
}
//分区间的合并
override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
b1.sum+=b2.sum
b1.count+=b2.count
b1
}
//返回计算结果
override def finish(reduction: AgeBuffer): Double = {
reduction.sum.toDouble/reduction.count
}
//导入包:import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession}
//自定义引用类型 :用project
override def bufferEncoder: Encoder[AgeBuffer] = {
Encoders.product
}
//系统值类型
override def outputEncoder: Encoder[Double] = {
Encoders.scalaDouble
}
}
调用
调用的时候 要对创建好的自定义函数实例化, 并导入jar包,把数据转为dataset
把对象调用toColumn方法转化为列。
然后根据列名进行查询
def main(args: Array[String]): Unit = {
//创建spark并设置app名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
df.createOrReplaceTempView("people")
import spark.implicits._
//创建自定义函数对象
val avg = new myAvg
//导入包import spark.implicits._, 转成dataset
val ds: Dataset[People] = df.as[People]
//将自定义函数对象转成列
val col: TypedColumn[People, Double] = avg.toColumn
//在进行查询的时候 ,会将查出来的记录 People类型交行自定义函数进行处理
ds.select(col).show()
}
还不快抢沙发