TreeviewCopyright © aleen42 all right reserved, powered by aleen42

Flink批处理

1. Flink的累加器

Accumulator即累加器,与MapReduce counter`的应用场景差不多,都能很好地观察task在运行期间的数据变化

可以在Flink job任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。

Flink现在有以下内置累加器。每个累加器都实现了Accumulator接口。

  • IntCounter
  • LongCounter
  • DoubleCounter

操作步骤:

1:创建累加器
private IntCounter numLines = new IntCounter(); 
2:注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3:使用累加器
this.numLines.add(1); 
4:获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")

示例:

遍历下列数据, 打印出单词的总数

"a","b","c","d"

开发步骤:

  1. 获取批处理环境
  2. 加载本地集合
  3. map转换
    1. 定义累加器
    2. 注册累加器
    3. 累加数据
  4. 数据写入到文件中
  5. 执行任务,获取任务执行结果对象(JobExecutionResult)
  6. 获取累加器数值
  7. 打印数值

代码:

import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * counter 累加器
  * Created by zhangjingcun.tech on 2018/10/30.
  */
object BatchDemoCounter {
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.api.scala._

    val data = env.fromElements("a","b","c","d")

    val res = data.map(new RichMapFunction[String,String] {
      //1:定义累加器
      val numLines = new IntCounter

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        //2:注册累加器
        getRuntimeContext.addAccumulator("num-lines",this.numLines)
      }

      var sum = 0;
      override def map(value: String) = {
        //如果并行度为1,使用普通的累加求和即可,但是设置多个并行度,则普通的累加求和结果就不准了
        sum += 1;
        System.out.println("sum:"+sum);
        this.numLines.add(1)
        value
      }

    }).setParallelism(1)
    //    res.print();
    res.writeAsText("d:\\data\\count0")
    val jobResult = env.execute("BatchDemoCounterScala")
    //    //3:获取累加器
    val num = jobResult.getAccumulatorResult[Int]("num-lines")
    println("num:"+num)
  }
}

Flink Broadcast和Accumulators的区别

Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改

Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作

Copyright © liufeihua.com 2021 all right reserved,powered by Gitbook该文件修订时间: 2021-04-14 09:47:10

results matching ""

    No results matching ""