Flink批处理

1. Flink批处理DataSource

DataSource 是什么呢?就字面意思其实就可以知道:数据来源。

Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去,这个 Data Sources 就是数据的来源地。

Flink在批处理中常见的source主要有两大类。

  • 基于本地集合的source(Collection-based-source)

  • 基于文件的source(File-based-source)

1.1. 基于本地集合的Source

在Flink中最常见的创建本地集合的DataSet方式有三种。

  1. 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。

  2. 使用env.fromCollection(),这种方式支持多种Collection的具体类型。

  3. 使用env.generateSequence(),这种方法创建基于Sequence的DataSet。

使用方式如下:

import org.apache.flink.api.scala.ExecutionEnvironment
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

/**
  * 读取集合中的批次数据
  */
object BatchFromCollection {
  def main(args: Array[String]): Unit = {

    //获取flink执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换
    import org.apache.flink.api.scala._

    //0.用element创建DataSet(fromElements)
    val ds0: DataSet[String] = env.fromElements("spark", "flink")
    ds0.print()

    //1.用Tuple创建DataSet(fromElements)
    val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    ds1.print()

    //2.用Array创建DataSet
    val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    ds2.print()

    //3.用ArrayBuffer创建DataSet
    val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
    ds3.print()

    //4.用List创建DataSet
    val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
    ds4.print()

    //5.用List创建DataSet
    val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
    ds5.print()

    //6.用Vector创建DataSet
    val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
    ds6.print()

    //7.用Queue创建DataSet
    val ds7: DataSet[String] = env.fromCollection(mutable.Queue("spark", "flink"))
    ds7.print()

    //8.用Stack创建DataSet
    val ds8: DataSet[String] = env.fromCollection(mutable.Stack("spark", "flink"))
    ds8.print()

    //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
    val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
    ds9.print()

    //10.用Seq创建DataSet
    val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
    ds10.print()

    //11.用Set创建DataSet
    val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
    ds11.print()

    //12.用Iterable创建DataSet
    val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    ds12.print()

    //13.用ArraySeq创建DataSet
    val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
    ds13.print()

    //14.用ArrayStack创建DataSet
    val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
    ds14.print()

    //15.用Map创建DataSet
    val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    ds15.print()

    //16.用Range创建DataSet
    val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
    ds16.print()

    //17.用fromElements创建DataSet
    val ds17: DataSet[Long] = env.generateSequence(1, 9)
    ds17.print()
  }
}

1.2. 基于文件的Source


Flink支持直接从外部文件存储系统中读取文件的方式来创建Source数据源,Flink支持的方式有以下几种:

  1. 读取本地文件数据
  2. 读取HDFS文件数据
  3. 读取CSV文件数据
  4. 读取压缩文件
  5. 遍历目录

下面分别演示每个数据源的加载方式:

1.2.1. 读取本地文件

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, GroupedDataSet}

/**
  * 读取文件中的批次数据
  */
object BatchFromFile {
  def main(args: Array[String]): Unit = {
    //使用readTextFile读取本地文件
    //初始化环境
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    //加载数据
    val datas: DataSet[String] = environment.readTextFile("data.txt")

    //触发程序执行
    datas.print()
  }
}

1.2.2. 读取HDFS文件数据

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, GroupedDataSet}

/**
  * 读取文件中的批次数据
  */
object BatchFromFile {
  def main(args: Array[String]): Unit = {
    //使用readTextFile读取本地文件
    //初始化环境
    val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
    //加载数据 
    val datas: DataSet[String] = environment.readTextFile("hdfs://node01:8020/README.txt")
    //触发程序执行
    datas.print()
  }
}

1.2.3. 读取CSV文件数据

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, GroupedDataSet}
/**
  * 读取CSV文件中的批次数据
  */
object BatchFromCsvFile {
  def main(args: Array[String]): Unit = {
    //初始化环境
    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

      // 用于映射CSV文件的样例类
    case class Student(id:Int, name:String)    

    val csvDataSet = env.readCsvFile[Student]("./data/input/subject.csv")
    csvDataSet.print()
    //触发程序执行
    datas.print()
  }
}

1.2.4. 读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩格式 扩展名 并行化
DEFLATE .deflate no
GZIP .gz .gzip no
Bzip2 .bz2 no
XZ .xz no
import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * 读取压缩文件的数据
  */
object BatchFromCompressFile {
  def main(args: Array[String]): Unit = {
    //初始化环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //加载数据
    val result = env.readTextFile("D:\\BaiduNetdiskDownload\\hbase-1.3.1-bin.tar.gz")
    //触发程序执行
    result.print()
  }
}

1.2.5. 遍历目录

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。

对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.configuration.Configuration

/**
  * 遍历目录的批次数据
  */
object BatchFromFolder {
  def main(args: Array[String]): Unit = {
    //初始化环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    val parameters = new Configuration
    // recursive.file.enumeration 开启递归
    parameters.setBoolean("recursive.file.enumeration", true)

    val result = env.readTextFile("D:\\data\\dataN").withParameters(parameters)
    //触发程序执行
    result.print()
  }
}
Copyright © liufeihua.com 2021 all right reserved,powered by Gitbook该文件修订时间: 2021-04-14 09:38:46

results matching ""

    No results matching ""