累计器

executor中会把driver定义的变量,变成一个新的副本进行累计。不会影响driver中的变量的值

var sum=0; //driver中定义
rdd.foreach{ //executor中执行
    case (word,count)=>{sum+=count}
}
println(sum)  //0

普通定义的变量,executor中会把driver中定义的变量复制一个副本,在executor中执行。

如果需要通过executor,对driver定义的变量进行更新,需要定义为累加器

累加器和普通的变量相比,会将executor端的结果,收集到driver端进行汇总

val myacc: LongAccumulator = sc.longAccumulator(name = "myacc")
    rdd.foreach{
      case(work,count)=>{
        myacc.add(count)
      }
    }
 println(myacc)

ideal添加快捷键

File-setting-editor-live templates->output->右边添加live template->scc->输入代码-》define->scala

val conf:SparkConf = new SparkConf() .setAppName("SparkDemo").setMaster("local[*]")
val sc:SparkContext = new SparkContext()
sc.stop();

创建RDD

sc.parallelize等价于sc.makeRDD , 在源代码中,makeRDD会调用parallelize方法

创建rdd可以从文件中读取来创建,也可以从hdfs中读取来创建,也可以从定义的集合在来创建

//创建一个集合
val list: List[Int] = List(1, 2, 3, 4);

//方法一
// val rdd: RDD[Int] = sc.parallelize(list);

//方法二,默认的分区为cpu核数,如果sparkcontext选定的master是local[*]
val rdd = sc.makeRDD(list)
rdd.collect().foreach(println);

//从文件中读取,默认的分区最小值为2.
val rdd = sc.textFile("d:/code/demo/spark/input/1.txt");
rdd.collect().foreach(println);

//从hdfs中读取
val rdd: RDD[String] = sc.textFile("hdfs://master:8020/input")
rdd.collect().foreach(println);

makeRDD调用方法

def makeRDD[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
    parallelize(seq, numSlices)  //调用这个方法
}


本文由 hcb 创作,采用 知识共享署名 3.0,可自由转载、引用,但需署名作者且注明文章出处。

还不快抢沙发

添加新评论