TreeviewCopyright © aleen42 all right reserved, powered by aleen42

Flink批处理

1.Flink的分布式缓存

Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!

缓存的使用流程:

使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!

注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上

操作步骤:

1:注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")  
2:访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

示例:

遍历下列数据, 并在open方法中获取缓存的文件

a,b,c,d

代码:

import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration

/**
  * 分布式缓存
  */
object BatchDemoDisCache { 
  def main(args: Array[String]): Unit = {
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    //1:注册文件
    env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")

    //读取数据
    val data = env.fromElements("a","b","c","d")
    val result = data.map(new RichMapFunction[String,String] {

      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        //访问数据
        val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")
        val lines = FileUtils.readLines(myFile)
        val it = lines.iterator()
        while (it.hasNext){
          val line = it.next();
          println("line:"+line)
        }
      }
      override def map(value: String) = {
        value
      }
    })
    result.print()
  }
}
Copyright © liufeihua.com 2021 all right reserved,powered by Gitbook该文件修订时间: 2021-04-14 09:46:53

results matching ""

    No results matching ""