TreeviewCopyright © aleen42 all right reserved, powered by aleen42
Flink批处理
1. Flink的累加器
Accumulator即累加器,与MapReduce counter`的应用场景差不多,都能很好地观察task在运行期间的数据变化
可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。
- IntCounter
- LongCounter
- DoubleCounter
操作步骤:
1:创建累加器
private IntCounter numLines = new IntCounter();
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器
this.numLines.add(1);
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
示例:
遍历下列数据, 打印出单词的总数
"a","b","c","d"
开发步骤:
- 获取批处理环境
- 加载本地集合
- map转换
- 定义累加器
- 注册累加器
- 累加数据
- 数据写入到文件中
- 执行任务,获取任务执行结果对象(JobExecutionResult)
- 获取累加器数值
- 打印数值
代码:
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* counter 累加器
* Created by zhangjingcun.tech on 2018/10/30.
*/
object BatchDemoCounter {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val data = env.fromElements("a","b","c","d")
val res = data.map(new RichMapFunction[String,String] {
//1:定义累加器
val numLines = new IntCounter
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//2:注册累加器
getRuntimeContext.addAccumulator("num-lines",this.numLines)
}
var sum = 0;
override def map(value: String) = {
//如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
sum += 1;
System.out.println("sum:"+sum);
this.numLines.add(1)
value
}
}).setParallelism(1)
// res.print();
res.writeAsText("d:\\data\\count0")
val jobResult = env.execute("BatchDemoCounterScala")
// //3:获取累加器
val num = jobResult.getAccumulatorResult[Int]("num-lines")
println("num:"+num)
}
}
Flink Broadcast和Accumulators的区别
Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改
Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作