UDAF 强类型自定义函数

udaf强类型自定义函数需要继承Aggregator抽象类,并实现6个方法

 //输入类型
  case class People(name:String,age:Long)

  //缓存数据类型
  case class AgeBuffer(var sum:Long,var count:Long)

  //输入类型,缓存数据类型,返回精英
  class myAvg extends Aggregator[People,AgeBuffer,Double]{
    //缓存数据初始化
    override def zero: AgeBuffer = {AgeBuffer(0L,0L)}
    //分区内数据进行聚合
    override def reduce(b: AgeBuffer, a: People): AgeBuffer = {
      b.sum+=a.age
      b.count+=1
      b
    }

    //分区间的合并
    override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {
      b1.sum+=b2.sum
      b1.count+=b2.count
      b1
    }

    //返回计算结果
    override def finish(reduction: AgeBuffer): Double = {
      reduction.sum.toDouble/reduction.count
    }

    //导入包:import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession}
    //自定义引用类型 :用project
    override def bufferEncoder: Encoder[AgeBuffer] = {
      Encoders.product
    }

    //系统值类型
    override def outputEncoder: Encoder[Double] = {
      Encoders.scalaDouble
    }
  }

调用

调用的时候 要对创建好的自定义函数实例化, 并导入jar包,把数据转为dataset

把对象调用toColumn方法转化为列。

然后根据列名进行查询

def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
    df.createOrReplaceTempView("people")
    import spark.implicits._
    //创建自定义函数对象
    val avg = new myAvg
    //导入包import spark.implicits._, 转成dataset
    val ds: Dataset[People] = df.as[People]
    //将自定义函数对象转成列
    val col: TypedColumn[People, Double] = avg.toColumn
    //在进行查询的时候 ,会将查出来的记录 People类型交行自定义函数进行处理
    ds.select(col).show()
  }

本文由 hcb 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论