参考IntAccumulatorParam的实现思路(上述文章中有讲):
网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、小程序设计、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了桂林免费建站欢迎大家使用!
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
// addInPlace有很多具体的实现类
// 如果想要实现自定义的话,就得实现这个方法
addInPlace(t1, t2)
}
}
自定义也可以通过这个方法去实现,从而兼容我们自定义的累加器
**
* 自定义的AccumulatorParam
*
* Created by lemon on 2018/7/28.
*/
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
// ++用于两个集合相加
r1++r2
}
override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
var data: Map[Int, Int] = Map()
data
}
}
/**
* 使用自定义的累加器,实现随机数
*
* Created by lemon on 2018/7/28.
*/
object CustomAccumulator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
val distData = sc.parallelize(1 to 10)
val mapCount = distData.map(x => {
val randomNum = new Random().nextInt(20)
// 构造一个k-v对
val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
uniqueKeyAccumulator += map
})
println(mapCount.count())
// 获取到累加器的值 中的key值,并进行打印
uniqueKeyAccumulator.value.keys.foreach(println)
sc.stop()
}
}
运行结果如下图:## 思路 & 需求
参考IntAccumulatorParam的实现思路(上述文章中有讲):
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T): T = {
// addInPlace有很多具体的实现类
// 如果想要实现自定义的话,就得实现这个方法
addInPlace(t1, t2)
}
}
自定义也可以通过这个方法去实现,从而兼容我们自定义的累加器
**
* 自定义的AccumulatorParam
*
* Created by lemon on 2018/7/28.
*/
object UniqueKeyAccumulator extends AccumulatorParam[Map[Int, Int]] {
override def addInPlace(r1: Map[Int, Int], r2: Map[Int, Int]): Map[Int, Int] = {
// ++用于两个集合相加
r1++r2
}
override def zero(initialValue: Map[Int, Int]): Map[Int, Int] = {
var data: Map[Int, Int] = Map()
data
}
}
/**
* 使用自定义的累加器,实现随机数
*
* Created by lemon on 2018/7/28.
*/
object CustomAccumulator {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("CustomAccumulator").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val uniqueKeyAccumulator = sc.accumulable(Map[Int, Int]())(UniqueKeyAccumulator)
val distData = sc.parallelize(1 to 10)
val mapCount = distData.map(x => {
val randomNum = new Random().nextInt(20)
// 构造一个k-v对
val map: Map[Int, Int] = Map[Int, Int](randomNum -> randomNum)
uniqueKeyAccumulator += map
})
println(mapCount.count())
// 获取到累加器的值 中的key值,并进行打印
uniqueKeyAccumulator.value.keys.foreach(println)
sc.stop()
}
}
运行结果如下图:
网页名称:生产常用Spark累加器剖析之三(自定义累加器)
分享路径:https://www.cdcxhl.com/article8/psojip.html
成都网站建设公司_创新互联,为您提供微信公众号、ChatGPT、、面包屑导航、定制网站、移动网站建设
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联