TreeviewCopyright © aleen42 all right reserved, powered by aleen42
Flink流处理
1. Flink在流处理上常见的sink
Flink将数据进行sink操作到本地文件/本地集合/HDFS等和之前的批处理操作一致.这里重点说下sink到Kafka以及MySQL的操作
1.1. Sink到Kafka
kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
示例
读取MySql的数据, 落地到Kafka中
开发步骤
- 创建流处理环境
- 设置并行度
- 添加自定义MySql数据源
- 转换元组数据为字符串
- 构建
FlinkKafkaProducer010 - 添加sink
- 执行任务
代码
object DataSink_kafka {
def main(args: Array[String]): Unit = {
// 1. 创建流处理环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 设置并行度
env.setParallelism(1)
// 3. 添加自定义MySql数据源
val source: DataStream[(Int, String, String, String)] = env.addSource(new MySql_source)
// 4. 转换元组数据为字符串
val strDataStream: DataStream[String] = source.map(
line => line._1 + line._2 + line._3 + line._4
)
//5. 构建FlinkKafkaProducer010
val p: Properties = new Properties
p.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
val sink = new FlinkKafkaProducer010[String]("test2", new SimpleStringSchema(), p)
// 6. 添加sink
strDataStream.addSink(sink)
// 7. 执行任务
env.execute("flink-kafka-wordcount")
}
}
1.2. Sink到MySQL
示例
加载下列本地集合,导入MySql中
List(
(10, "dazhuang", "123456", "大壮"),
(11, "erya", "123456", "二丫"),
(12, "sanpang", "123456", "三胖")
)
开发步骤
- 创建流执行环境
- 准备数据
- 添加sink
- 构建自定义Sink,继承自
RichSinkFunction - 重写
open方法,获取Connection和PreparedStatement - 重写
invoke方法,执行插入操作 - 重写
close方法,关闭连接操作
- 构建自定义Sink,继承自
- 执行任务
代码
object DataSink_MySql {
def main(args: Array[String]): Unit = {
//1.创建流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.准备数据
val value: DataStream[(Int, String, String, String)] = env.fromCollection(List(
(10, "dazhuang", "123456", "大壮"),
(11, "erya", "123456", "二丫"),
(12, "sanpang", "123456", "三胖")
))
// 3. 添加sink
value.addSink(new MySql_Sink)
//4.触发流执行
env.execute()
}
}
// 自定义落地MySql的Sink
class MySql_Sink extends RichSinkFunction[(Int, String, String, String)] {
private var connection: Connection = null
private var ps: PreparedStatement = null
override def open(parameters: Configuration): Unit = {
//1:加载驱动
Class.forName("com.mysql.jdbc.Driver")
//2:创建连接
connection = DriverManager.getConnection("jdbc:mysql:///test", "root", "123456")
//3:获得执行语句
val sql = "insert into user(id , username , password , name) values(?,?,?,?);"
ps = connection.prepareStatement(sql)
}
override def invoke(value: (Int, String, String, String)): Unit = {
try {
//4.组装数据,执行插入操作
ps.setInt(1, value._1)
ps.setString(2, value._2)
ps.setString(3, value._3)
ps.setString(4, value._4)
ps.executeUpdate()
} catch {
case e: Exception => println(e.getMessage)
}
}
//关闭连接操作
override def close(): Unit = {
if (connection != null) {
connection.close()
}
if (ps != null) {
ps.close()
}
}
}