TreeviewCopyright © aleen42 all right reserved, powered by aleen42

Flink流处理

1. DataStream的Transformation

和DataSet批处理一样,DataStream也包括一系列的Transformation操作.

流数据处理和批数据处理有很多操作是类似的,所以就不再一 一讲解。我们主要讲解,和批处理不一样的一些操作。

1.1. keyBy

按照指定的key来进行分流,类似于批处理中的groupBy。可以按照索引名/字段名来指定分组的字段.

示例

读取socket数据源, 进行单词的计数

开发步骤

  1. 获取流处理运行环境
  2. 设置并行度
  3. 获取数据源
  4. 转换操作
    1. 以空白进行分割
    2. 给每个单词计数1
    3. 根据单词分组
    4. 求和
  5. 打印到控制台
  6. 执行任务

代码

/**
  * KeyBy算子的使用
  */
object Transformation_KeyBy {
  def main(args: Array[String]): Unit = {
    // 1.获取流处理运行环境
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    // 2.设置并行度
    senv.setParallelism(3)
    //3. 获取Socket数据源
    val stream = senv.socketTextStream("node01", 9999, '\n')
    //4. 转换操作,以空格切分,每个元素计数1,以单词分组,累加
    val text = stream.flatMap(_.split("\\s"))
      .map((_,1))
      //TODO 逻辑上将一个流分成不相交的分区,每个分区包含相同键的元素。在内部,这是通过散列分区来实现的
      .keyBy(_._1)
      //TODO 这里的sum并不是分组去重后的累加值,如果统计去重后累加值,则使用窗口函数
      .sum(1)
    //5. 打印到控制台
    text.print()
    //6. 执行任务
    senv.execute()
  }
}

1.2. Connect

Connect用来将两个DataStream组装成一个ConnectedStreams。它用了两个泛型,即不要求两个dataStream的element是同一类型。这样我们就可以把不同的数据组装成同一个结构.

示例

读取两个不同类型的数据源,使用connect进行合并打印。

开发步骤

  1. 创建流式处理环境
  2. 添加两个自定义数据源
  3. 使用connect合并两个数据流,创建ConnectedStreams对象
  4. 遍历ConnectedStreams对象,转换为DataStream
  5. 打印输出,设置并行度为1
  6. 执行任务

自定义数据源

/**
  * 创建自定义并行度为1的source 
  * 实现从1开始产生递增数字 
  */
class MyLongSourceScala extends SourceFunction[Long] {
  var count = 1L
  var isRunning = true

  override def run(ctx: SourceContext[Long]) = {
    while (isRunning) {
      ctx.collect(count)
      count += 1
      TimeUnit.SECONDS.sleep(1)
    }
  }

  override def cancel() = {
    isRunning = false
  }
}

/**
  * 创建自定义并行度为1的source
  * 实现从1开始产生递增字符串
  */
class MyStringSourceScala extends SourceFunction[String] {
  var count = 1L
  var isRunning = true

  override def run(ctx: SourceContext[String]) = {
    while (isRunning) {
      ctx.collect("str_" + count)
      count += 1
      TimeUnit.SECONDS.sleep(1)
    }
  }

  override def cancel() = {
    isRunning = false
  }
}

代码


object StreamingDemoConnectScala {
  def main(args: Array[String]): Unit = {
    // 1. 创建流式处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 添加两个自定义数据源
    val text1: DataStream[Long] = env.addSource(new MyLongSourceScala)
    val text2: DataStream[String] = env.addSource(new MyStringSourceScala)
    // 3. 使用connect合并两个数据流,创建ConnectedStreams对象
    val connectedStreams: ConnectedStreams[Long, String] = text1.connect(text2)
    // 4. 遍历ConnectedStreams对象,转换为DataStream
    val result: DataStream[Any] = connectedStreams.map(line1 => {
      line1
    }, line2 => {
      line2
    })
    // 5. 打印输出,设置并行度为1
    result.print().setParallelism(1)
    // 6. 执行任务
    env.execute("StreamingDemoWithMyNoParallelSourceScala")
  }
}

1.3. split和select

split就是将一个DataStream分成多个流,用SplitStream来表示

DataStream → SplitStream

select就是获取分流后对应的数据,跟split搭配使用,从SplitStream中选择一个或多个流

SplitStream → DataStream

示例

加载本地集合(1,2,3,4,5,6), 使用split进行数据分流,分为奇数和偶数. 并打印奇数结果

开发步骤

  1. 创建流处理环境
  2. 设置并行度
  3. 加载本地集合
  4. 数据分流,分为奇数和偶数
  5. 获取分流后的数据
  6. 打印数据
  7. 执行任务

代码:

/**
  * 演示Split和Select方法
  * Split: DataStream->SplitStream
  * Select: SplitStream->DataStream
  */
object SplitAndSelect {

  def main(args: Array[String]): Unit = {
    // 1. 创建批处理环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // 2. 设置并行度
    env.setParallelism(1)
    // 3. 加载本地集合
    val elements: DataStream[Int] = env.fromElements(1, 2, 3, 4, 5, 6)
    // 4. 数据分流,分为奇数和偶数
    val split_data: SplitStream[Int] = elements.split(
      (num: Int) =>
        num % 2 match {
          case 0 => List("even")
          case 1 => List("odd")
        }
    )
    // 5. 获取分流后的数据
    val even: DataStream[Int] = split_data.select("even")
    val odd: DataStream[Int] = split_data.select("odd")
    val all: DataStream[Int] = split_data.select("odd", "even")

    // 6. 打印数据
    odd.print()

    // 7. 执行任务
    env.execute()
  }
}
Copyright © liufeihua.com 2021 all right reserved,powered by Gitbook该文件修订时间: 2021-04-14 10:07:52

results matching ""

    No results matching ""