Spark sql求平均年龄

//输入
{"name":"liming","age":11}
{"name":"LiLei","age":21}
{"name":"Tom","age":31}

  def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
    df.createOrReplaceTempView("people")
    spark.sql("select avg(age) from people").show()
  }

//output

+--------+
|avg(age)|
+--------+
|    21.0|
+--------+

自带的函数avg使用很方便 ,如果没有自带的系统函数的话,要自定义实现udaf函数

UDAF 弱类型自定义函数

//自定义udaf函数,抽像函数实现父类的方法
  // 三个类型:输入类型,缓存类型,输出类型 +缓存初始化-更新-合并-计算
  class  myAvg extends UserDefinedAggregateFunction{
    //输入数据的类型,多个列的类型,
    override def inputSchema: StructType = {
      StructType(Array(StructField("age",IntegerType)))
    }
    //缓存的数据类型-缓存中间的计算结果
    override def bufferSchema: StructType = {
      StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
    }
    //聚合函数返回的数据类型
    override def dataType: DataType = DoubleType
    //稳定性。 当前程序是否稳定。一般都是true,相同的输入有相同的输出
    override def deterministic: Boolean = true
    //初始化, 缓存设置为初始状态
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      //设置缓存为年龄初始化为:0
      buffer(0)=0L
      //设置缓存为人数初始化为:0
      buffer(1)=0L
    }
    //更新缓存的数据。两个参数(缓存的数据,输入的内容)
    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      if(!buffer.isNullAt(0)){ //判断是否已经初始化
        buffer(0)=buffer.getLong(0)+input.getInt(0)
        //      buffer(0)=buffer.getAs[Long](0)+input.getInt(0) 也可以写成这样的写法
        buffer(1)=buffer.getLong(1)+1
      }
    }
    //分区间的合并
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)
      buffer2(1)=buffer1.getLong(1)+buffer2.getLong(1)
    }
    //计算的业务逻辑
    override def evaluate(buffer: Row): Any = {
      buffer.getDouble(0)/buffer.getLong(1)
    }
  }

如何使用自定义函数

创建自定义函数对象。 并调用 。

  def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
//    spark.udf.register("changeName",(name:String)=>{"new" +name})
    df.createOrReplaceTempView("people")
    //创建自定义函数对象
    val avg = new myAvg
    //注册自定义函数
    spark.udf.register("myAvg",avg)
    spark.sql("select myAvg(name),age from people").show()
//    spark.sql("select changeName(name),age from people").show()
  }

//输出结果
+--------+
|avg(age)|
+--------+
|    21.0|
+--------+


本文由 hcb 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论