UDF函数-1to1

用户自定义函数,一路输入一路输出

自定义函数要先注册 。

匿名函数实现

//输入数据
{"name":"liming","age":11}
{"name":"LiLei","age":21}
{"name":"Tom","age":31}

def main(args: Array[String]): Unit = {
    //创建spark并设置app名称
    val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
    //注册匿名函数实现
    spark.udf.register("changeName",(name:String)=>{"new" +name})
    df.createOrReplaceTempView("people")
    spark.sql("select changeName(name),age from people").show()
    spark.stop()
  }
//输出数据 
|UDF:changeName(name)|age|
+--------------------+---+
|           newliming| 11|
|            newLiLei| 21|
|              newTom| 31|
+--------------------+---+

自定义函数函数

通过累加器实现求平均年龄,自定义类继承AccumulatorV2

def main(args: Array[String]): Unit = {
  //创建spark并设置app名称
  val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
  //创建sparkcontext ,该对象是提交spark app的入口
  val sc: SparkContext = new SparkContext(conf)
  //测试数据,创建一个rdd
  val rdd: RDD[(String, Int)] = sc.makeRDD(List(("tom", 12), ("Lilei", 12), ("Jerry", 13)))

  var myAcc=new MyAccumulator

  sc.register(myAcc)

  rdd.foreach{
  case (name, age) =>{
  myAcc.add(age)
}
}
  println("-----"+myAcc.value)
  //关闭连接
  sc.stop();
}

  class MyAccumulator extends AccumulatorV2[Int,Double]{
    var aggSum=0
    var countSum=0
    //定义初始状态
    override def isZero: Boolean = {
      aggSum==0 &&  countSum==0
    }

    //拷贝,创建一个对象
    override def copy(): AccumulatorV2[Int, Double] = {
      var myAcc=new MyAccumulator
      myAcc.aggSum=this.aggSum
      myAcc.countSum=this.countSum
      myAcc
    }
    //重置,执行的时候 先拷贝副本,再让副本归0
    override def reset(): Unit = {
      this.aggSum=0
      countSum=0
    }

    //做累加,传一个值过来。值进行累加,人数进行加1操作
    override def add(age: Int): Unit = {
      aggSum+=age
      countSum+=1
    }

    //合并操作,如果传过来的对象是MyAccumulator ,则进行累加
    override def merge(other: AccumulatorV2[Int, Double]): Unit = {
      other match {
        case ac: MyAccumulator => {
          this.aggSum+=ac.aggSum
          this.countSum+=ac.countSum
        }

      }

    }

    //返回的结果
    override def value: Double = {
      aggSum/countSum
    }
  }

本文由 hcb 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论