UDF函数-1to1
用户自定义函数,一路输入一路输出
自定义函数要先注册 。
匿名函数实现
//输入数据
{"name":"liming","age":11}
{"name":"LiLei","age":21}
{"name":"Tom","age":31}
def main(args: Array[String]): Unit = {
//创建spark并设置app名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val df: DataFrame = spark.read.json("D:\\code\\demo\\spark\\input\\people.json")
//注册匿名函数实现
spark.udf.register("changeName",(name:String)=>{"new" +name})
df.createOrReplaceTempView("people")
spark.sql("select changeName(name),age from people").show()
spark.stop()
}
//输出数据
|UDF:changeName(name)|age|
+--------------------+---+
| newliming| 11|
| newLiLei| 21|
| newTom| 31|
+--------------------+---+
自定义函数函数
通过累加器实现求平均年龄,自定义类继承AccumulatorV2
def main(args: Array[String]): Unit = {
//创建spark并设置app名称
val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
//创建sparkcontext ,该对象是提交spark app的入口
val sc: SparkContext = new SparkContext(conf)
//测试数据,创建一个rdd
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("tom", 12), ("Lilei", 12), ("Jerry", 13)))
var myAcc=new MyAccumulator
sc.register(myAcc)
rdd.foreach{
case (name, age) =>{
myAcc.add(age)
}
}
println("-----"+myAcc.value)
//关闭连接
sc.stop();
}
class MyAccumulator extends AccumulatorV2[Int,Double]{
var aggSum=0
var countSum=0
//定义初始状态
override def isZero: Boolean = {
aggSum==0 && countSum==0
}
//拷贝,创建一个对象
override def copy(): AccumulatorV2[Int, Double] = {
var myAcc=new MyAccumulator
myAcc.aggSum=this.aggSum
myAcc.countSum=this.countSum
myAcc
}
//重置,执行的时候 先拷贝副本,再让副本归0
override def reset(): Unit = {
this.aggSum=0
countSum=0
}
//做累加,传一个值过来。值进行累加,人数进行加1操作
override def add(age: Int): Unit = {
aggSum+=age
countSum+=1
}
//合并操作,如果传过来的对象是MyAccumulator ,则进行累加
override def merge(other: AccumulatorV2[Int, Double]): Unit = {
other match {
case ac: MyAccumulator => {
this.aggSum+=ac.aggSum
this.countSum+=ac.countSum
}
}
}
//返回的结果
override def value: Double = {
aggSum/countSum
}
}
还不快抢沙发