TreeviewCopyright © aleen42 all right reserved, powered by aleen42
Flink批处理
1. Flink批处理Sink
flink在批处理中常见的sink
基于本地集合的sink(Collection-based-sink)
基于文件的sink(File-based-sink)
1.1. 基于本地集合的sink
目标:
基于下列数据,分别 进行打印输出,error输出,collect()
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
代码:
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.api.scala._
object BatchSinkCollection {
def main(args: Array[String]): Unit = {
//1.定义环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.定义数据 stu(age,name,height)
val stu: DataSet[(Int, String, Double)] = env.fromElements(
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
)
//3.TODO sink到标准输出
stu.print
//3.TODO sink到标准error输出
stu.printToErr()
//4.TODO sink到本地Collection
print(stu.collect())
env.execute()
}
}
1.2. 基于文件的sink
flink支持多种存储设备上的文件,包括本地文件,hdfs文件等。
flink支持多种文件的存储格式,包括text文件,CSV文件等。
writeAsText():TextOuputFormat - 将元素作为字符串写入行。字符串是通过调用每个元素的toString()方法获得的。
1.2.1. 将数据写入本地文件
目标:
基于下列数据,写入到文件中
Map(1 -> "spark", 2 -> "flink")
代码:
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.api.scala._
/**
* 将数据写入本地文件
*/
object BatchSinkFile {
def main(args: Array[String]): Unit = {
//1.定义环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.定义数据
val ds1: DataSet[Map[Int, String]] = env.fromElements(Map(1 -> "spark", 2 -> "flink"))
//3 .TODO 写入到本地,文本文档,NO_OVERWRITE模式下如果文件已经存在,则报错,OVERWRITE模式下如果文件已经存在,则覆盖
ds1.setParallelism(1).writeAsText("test/data1/aa", WriteMode.OVERWRITE)
env.execute()
}
}
1.2.2. 将数据写入HDFS
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.api.scala._
/**
* 将数据写入本地文件
*/
object BatchSinkFile {
def main(args: Array[String]): Unit = {
//1.定义环境
val env = ExecutionEnvironment.getExecutionEnvironment
//2.定义数据 stu(age,name,height)
val stu: DataSet[(Int, String, Double)] = env.fromElements(
(19, "zhangsan", 178.8),
(17, "lisi", 168.8),
(18, "wangwu", 184.8),
(21, "zhaoliu", 164.8)
)
val ds1: DataSet[Map[Int, String]] = env.fromElements(Map(1 -> "spark", 2 -> "flink"))
//1.TODO 写入到本地,文本文档,NO_OVERWRITE模式下如果文件已经存在,则报错,OVERWRITE模式下如果文件已经存在,则覆盖
ds1.setParallelism(1).writeAsText("hdfs://bigdata111:9000/a", WriteMode.OVERWRITE)
env.execute()
}
}