TreeviewCopyright © aleen42 all right reserved, powered by aleen42
Flink批处理
1.Flink的分布式缓存
Flink提供了一个类似于Hadoop的分布式缓存,让并行运行实例的函数可以在本地访问。这个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等!
缓存的使用流程:
使用ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS上的文件),为缓存文件指定一个名字注册该缓存文件。当程序执行时候,Flink会自动将复制文件或者目录到所有worker节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
注意:广播是将变量分发到各个worker节点的内存上,分布式缓存是将文件缓存到各个worker节点上
操作步骤:
1:注册一个文件
env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
2:访问数据
File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");
示例:
遍历下列数据, 并在open方法中获取缓存的文件
a,b,c,d
代码:
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
/**
* 分布式缓存
*/
object BatchDemoDisCache {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
//隐式转换
import org.apache.flink.api.scala._
//1:注册文件
env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")
//读取数据
val data = env.fromElements("a","b","c","d")
val result = data.map(new RichMapFunction[String,String] {
override def open(parameters: Configuration): Unit = {
super.open(parameters)
//访问数据
val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")
val lines = FileUtils.readLines(myFile)
val it = lines.iterator()
while (it.hasNext){
val line = it.next();
println("line:"+line)
}
}
override def map(value: String) = {
value
}
})
result.print()
}
}