专栏原创出处:github-源笔记文件 (opens new window)github-源码 (opens new window),欢迎 Star,转载请附上原文出处链接和本声明。
本节内容部分源码 (opens new window)

# 1 示例程序

Batch.scala (opens new window)

// 操作原始 DataSet API 完成 2 个表数据过滤 join 聚合操作
object BatchDataSet extends BatchExecutionEnvironmentApp {

  // 用户登录数据 DataSet
  val userLoginDataSet = DataSet.userLogin(this)
  // 角色登录数据 DataSet
  val roleLoginDataSet = DataSet.roleLogin(this)

    .filter(_.dataUnix > 1571414499)
    .filter(_.status == "LOGIN")
    .join(roleLoginDataSet, JoinHint.BROADCAST_HASH_FIRST).where(_.uid).equalTo(_.uid)
    .apply((left, _) => left.platform -> 1)
    .sortPartition(1, Order.ASCENDING)

// 操作 Table API 完成 2 个表数据过滤 join 聚合操作
object BatchTable extends BatchTableEnvironmentApp {

  private val userLogin = RegisterDataSet.userLogin(this)
  private val roleLogin = RegisterDataSet.roleLogin(this)

    .where('dataUnix > 1571414499 && 'status === "LOGIN")
    .join(btEnv.scan(roleLogin).select("uid as r_uid"), "uid = r_uid")
    .select("platform as p , count(platform) as c")
    .toDataSet[(String, Long)]

// 操作 SQL 完成 2 个表数据过滤 join 聚合操作
object BatchSQL extends BatchTableEnvironmentApp {

  private val table = RegisterDataSet.userLogin(this)

       |SELECT platform AS p,COUNT(platform) AS c FROM
       |SELECT platform,dataUnix,uid,status FROM $table
       |WHERE dataUnix > 0 AND status = 'LOGIN'
       |GROUP BY platform
    .toDataSet[(String, Long)]

# 2 程序数据源输入(Data Sources)

# 2.1 基于文件:

  • readTextFile(path)// TextInputFormat-逐行读取文件,并将它们作为字符串返回。
  • readTextFileWithValue(path)// TextValueInputFormat-逐行读取文件,并将它们作为 StringValues 返回。StringValues 是可变字符串。
  • readCsvFile(path)// CsvInputFormat-解析以逗号(或其他字符)分隔的字段的文件。返回元组,case class 对象或 POJO 的数据集。支持基本的 Java 类型及其与 Value 相对应的字段类型。
  • readFileOfPrimitives(path, delimiter)// PrimitiveInputFormat-解析以换行符(或其他 char 序列)定界的原始数据类型的文件,例如 String 或 Integer 使用给定的定界符。
  • readSequenceFile(Key, Value, path)// SequenceFileInputFormat-创建 JobConf 并从指定的路径中读取类型为 SequenceFileInputFormat,Key 类和 Value 类的文件,并将它们作为 Tuple2 <Key,Value> 返回。

# 2.2 基于集合:

  • fromCollection(Iterable)-从 Iterable 创建数据集。Iterable 返回的所有元素都必须是同一类型。
  • fromCollection(Iterator)-从迭代器创建数据集。该类指定迭代器返回的元素的数据类型。
  • fromElements(elements: _*)-从给定的对象序列创建数据集。所有对象必须具有相同的类型。
  • fromParallelCollection(SplittableIterator)-从迭代器并行创建数据集。该类指定迭代器返回的元素的数据类型。
  • generateSequence(from, to) -并行生成给定间隔中的数字序列。

# 2.3 通用:

  • readFile(inputFormat, path)// FileInputFormat-接受文件输入格式。
  • createInput(inputFormat)// InputFormat-接受通用输入格式。

# 3 程序数据输出(Data Sinks)

  • writeAsText()// TextOutputFormat-将元素按行写为字符串。通过调用每个元素的 toString()方法获得字符串。
  • writeAsCsv(...)// CsvOutputFormat-将元组写为逗号分隔的值文件。行和字段定界符是可配置的。每个字段的值来自对象的 toString()方法。
  • print()// printToErr()- 在标准输出/标准错误流上打印每个元素的 toString()值。
  • write()// FileOutputFormat-自定义文件输出的方法和基类。支持自定义对象到字节的转换。
  • output()// OutputFormat-最通用的输出方法,用于不基于文件的数据接收器(例如将结果存储在数据库中)。

# 4 转换操作(Transformations)

# 4.1 Map

Map 转换将用户定义的 map 函数应用于 DataSet 的每个元素。它实现了一对一的映射,也就是说,函数必须恰好返回一个元素。

val intPairs: DataSet[(Int, Int)] = // [...]
val intSums = intPairs.map { pair => pair._1 + pair._2 }

# 4.2 FlatMap

FlatMap 转换在数据集的每个元素上应用用户定义的 FlatMap 方法。映射函数的可以为每个输入元素返回任意许多结果元素(包括无结果元素)。

val textLines: DataSet[String] = // [...]
val words = textLines.flatMap { _.split(" ") }

# 4.3 MapPartition

MapPartition 在单个函数调用中转换并行分区。map-partition 函数将分区获取为 Iterable,并可以产生任意数量的结果值。每个分区中元素的数量取决于并行度和先前的操作。

val textLines: DataSet[String] = // [...]
// 因为返回值必须是 Collection,所以需要 Some
// 从 Option 到 Collection 的隐式转换
val counts = texLines.mapPartition { in => Some(in.size) }

# 4.4 Filter

Filter 转换将用户定义的过滤器功能应用于 DataSet 的每个元素,并仅保留函数返回的那些元素 true。

val intNumbers: DataSet[Int] = // [...]
val naturalNumbers = intNumbers.filter { _ > 0 }

# 4.5 Projection of Tuple DataSet(元组数据集投影)

project 转换将删除或移动元组数据集的元组字段。该 project(int...) 方法选择应由其索引保留的元组字段,并在输出元组中定义其顺序。 project 不需要定义函数体

DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0);

请注意,Java 编译器无法推断 project 运算符的返回类型。如果您根据 project 运算符的结果调用另一个运算符,则可能会导致问题,例如:

DataSet<Tuple5<String,String,String,String,String>> ds = ....
DataSet<Tuple1<String>> ds2 = ds.project(0).distinct(0);

可以通过如下提示 project 操作符的返回类型来克服此问题:

DataSet<Tuple1<String>> ds2 = ds.<Tuple1<String>>project(0).distinct(0);

# 4.6 Transformations on Grouped DataSet(分组数据集的转换)

reduce 操作可以对分组的数据集进行操作。指定用于分组的密钥可以通过多种方式完成:

  • 关键表达,groupBy("key")
  • 键选择器功能,implements KeySelector
  • 一个或多个字段位置键(仅限元组数据集),groupBy(0, 1)
  • 案例类别字段(仅案例类别),groupBy("a", "b")

# 4.7 Reduce on Grouped DataSet(减少分组数据集)

应用于分组数据集的 Reduce 转换使用用户定义的 reduce 函数将每个组简化为单个元素。
对于每组输入元素,reduce 函数依次将成对的元素组合为一个元素,直到每组只剩下一个元素为止。

// some ordinary POJO
class WC(val word: String, val count: Int) {
  def this() {
    this(null, -1)
  // [...]

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy { _.word } reduce {
  (w1, w2) => new WC(w1.word, w1.count + w2.count)

# 4.8 GroupReduce on Grouped DataSet(分组数据集上的 GroupReduce)

应用于分组数据集的 GroupReduce 转换为每个组调用用户定义的 group-reduce 函数。
此与 Reduce 的区别在于,用户定义的函数可一次获取整个组。该函数在组的所有元素上使用 Iterable 调用,并且可以返回任意数量的结果元素。

val input: DataSet[(Int, String)] = // [...]
val output = input.groupBy(0).reduceGroup {
      (in, out: Collector[(Int, String)]) =>
        in.toSet foreach (out.collect)

# 4.9 可组合的 GroupReduce 函数

object GroupReduceOnCombinatorGroupReduceFunctions extends Transformations {
    .map(o => (o.uid, o.dataUnix, o.money))
    .filter(_._1.contains("1|1051")) // 筛选部分用户
    .groupBy(0, 1) // 按用户 ID 分组
    .sortGroup(2, Order.ASCENDING) // 分组排序,按订单金额升序
    .reduceGroup(new MyCombinableGroupReducer())

  /* sortGroup 后原始数据集
   /* 计算结果为

  * 与 reduce 函数相比,group-reduce 函数不是可隐式组合的。
  * 为了使 group-reduce 函数可组合,它必须实现 GroupCombineFunction 接口。
  * 要点:GroupCombineFunction 接口的通用输入和输出类型必须等于 GroupReduceFunction 的通用输入类型,
  * 如以下示例所示:
import scala.collection.JavaConverters._

class MyCombinableGroupReducer
  extends GroupReduceFunction[(String, Int, Double), (String, Int, Double)]
    with GroupCombineFunction[(String, Int, Double), (String, Int, Double)] {
  override def reduce(values: lang.Iterable[(String, Int, Double)],
                      out: Collector[(String, Int, Double)]): Unit = {
    values.iterator().asScala.foreach(o => out.collect(o))

  override def combine(values: lang.Iterable[(String, Int, Double)],
                       out: Collector[(String, Int, Double)]): Unit = {
    val r = values.iterator().asScala.reduce((o1, o2) => (o1._1, o1._2, o1._3 + o2._3))
    out.collect(r) // 合并相同 key 的价格

# 4.10 GroupCombine on a Grouped DataSet(分组数据集上的 GroupCombine)

GroupCombine 转换是可组合 GroupReduceFunction 中的合并步骤的通用形式。
从某种意义上讲,它是广义的,它允许将输入类型组合为 I 任意输出类型 O。
相反,GroupReduce 中的 Combine 步骤仅允许从 input type I 到 output type 的组合 I。
这是因为 GroupReduceFunction 中的 reduce 步骤需要输入类型 I。

在一些应用中,期望在执行附加转换之前将数据集组合成中间格式(例如,以减小数据大小)。这可以通过具有很少成本的 CombineGroup 转换来实现。

注意:分组数据集上的 GroupCombine 使用贪婪策略在内存中执行,该策略可能不会一次处理所有数据,而是分多个步骤进行。它也可以在单个分区上执行,而无需像 GroupReduce 转换那样进行数据交换。这可能会导致部分结果。

下面的示例演示将 CombineGroup 转换用于替代 WordCount 实现。

val input: DataSet[String] = [..] // The words received as input

val combinedWords: DataSet[(String, Int)] = input
  .combineGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var count = 0

        for (word <- words) {
            key = word
            count += 1
        out.collect((key, count))

val output: DataSet[(String, Int)] = combinedWords
  .reduceGroup {
    (words, out: Collector[(String, Int)]) =>
        var key: String = null
        var sum = 0

        for ((word, sum) <- words) {
            key = word
            sum += count
        out.collect((key, sum))

# 4.11 Aggregate on Grouped Tuple DataSet(在分组元组数据集聚合)

  • 有一些常用的聚合操作。聚合转换提供以下内置聚合功能:
  • Sum
  • Min, and
  • Max


// 角色付费数据 DataSet
  val roleLoginDs = DataSet.rolePay(this)

  { // aggregate
      .map(o => (o.uid, o.dataUnix, o.money))
      .filter(_._1.contains("0|102")) // 筛选部分用户
      .aggregate(Aggregations.SUM, 2)
      // .aggregate(Aggregations.MIN, 1)
      // .andMin(1)
    .aggregate(Aggregations.SUM, 2) 计算结果为
    .aggregate(Aggregations.SUM, 2).andMin(1) 计算结果为
    .aggregate(Aggregations.SUM, 2).aggregate(Aggregations.MIN, 1) 计算结果为

要将多个聚合应用于一个 DataSet,必须.and() 在第一个聚合之后使用该函数,这意味着.aggregate(SUM, 0).and(MIN, 2) 产生原始 DataSet 的字段 0 之和和字段 2 的最小值。
与此相反,.aggregate(SUM, 0).aggregate(MIN, 2) 将对聚合应用聚合。

# 4.12 MinBy / MaxBy on Grouped Tuple DataSet(分组元组数据集上的 MinBy / MaxBy)


    .map(o => (o.uid, o.dataUnix, o.money))
    .minBy(1, 2)
  /* 原始数据集

  /* 计算结果为

# 4.13 Reduce on full DataSet(减少完整的 DataSet)

Reduce 转换将用户定义的 reduce 函数应用于 DataSet 的所有元素。reduce 函数随后将成对的元素组合为一个元素,直到仅剩下一个元素为止。

使用 Reduce 转换来减少完整的 DataSet 意味着最终的 Reduce 操作不能并行进行。但是,reduce 函数可以自动组合,因此 Reduce 转换在大多数情况下都不会限制可伸缩性。

val intNumbers = env.fromElements(1,2,3)
val sum = intNumbers.reduce (_ + _)

# 4.14 GroupReduce on full DataSet(完整数据集上的 GroupReduce)

GroupReduce 转换在 DataSet 的所有元素上应用用户定义的 group-reduce 函数。group-reduce 可以迭代 DataSet 的所有元素并返回任意数量的结果元素

val input: DataSet[Int] = // [...]
val output = input.reduceGroup(new MyGroupReducer())

注意:如果 group-reduce 函数不可组合,则无法并行完成对完整 DataSet 的 GroupReduce 转换。 因此,这可能是非常计算密集的操作。请参阅上面有关“可组合的 GroupReduce 函数”的段落,以了解如何实现可组合组简化函数。

# 4.15 GroupCombine on a full DataSet(完整数据集上的 GroupCombine)

完整数据集上的 GroupCombine 与分组数据集上的 GroupCombine 相似。

# 4.16 Aggregate on full Tuple DataSet(完整数据集上聚合)


  • 有一些常用的聚合操作。聚合转换提供以下内置聚合功能:
  • Sum
  • Min, and
  • Max
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.aggregate(SUM, 0).and(MIN, 2)

# 4.17 MinBy/MaxBy on full Tuple DataSet(完整数据集上的 MinBy/MaxBy)


val input: DataSet[(Int, String, Double)] = // [...]
val output: DataSet[(Int, String, Double)] = input                          
                                   .maxBy(0, 2) // select tuple with maximum values for first and third field.

# 4.18 Distinct(去重)

Distinct 转换计算源数据集的不同元素的数据集。以下代码从数据集中删除所有重复的元素:

val input: DataSet[(Int, Double, String)] = // [...]

val output = input.distinct() // 所有元组字段
val output = input.distinct(0,2) // 指定具体元组字段位置
val output = input.distinct {x => Math.abs(x)} // 使用  KeySelector 选择器

// some ordinary POJO
case class CustomType(aName : String, aNumber : Int) { }

val input: DataSet[CustomType] = // [...]
val output = input.distinct("aName", "aNumber") // 指定具体字段
val output = input.distinct("_") // 也可以通过通配符指示使用所有字段:

# 4.19 Join

# 4.20 OuterJoin

# 4.21 Cross

Cross 转换将两个数据集组合为一个数据集。它构建两个输入数据集的元素的所有成对组合,即构建笛卡尔积。
Cross 转换要么在每对元素上调用用户定义的交叉函数,要么输出 Tuple2

# Cross with 自定义函数

    .cross(userLoginDs.filter(_.uid.equals("0|107"))) {
      (c1, c2) => (c1.uid, c2.uid)

# Cross with 数据集大小预估提示

  // crossWithTiny => 告诉系统假定右侧比左侧小很多
    .crossWithTiny(userLoginDs.filter(_.uid.equals("0|107"))) {
      (c1, c2) => (c1.uid, c2.uid)

  // crossWithHuge => 告诉系统假定左侧比右侧小很多
    .crossWithHuge(userLoginDs.filter(_.uid.equals("0|107"))) {
      (c1, c2) => (c1.uid, c2.uid)

# 4.22 CoGroup

CoGroup 转换共同处理两个数据集的组。两个数据集都分组在一个定义的键上,并且两个共享相同键的数据集的组一起交给用户定义的 co-group function。
如果对于一个特定的键,只有一个 DataSet 有一个组,则使用该组和一个空组调用共同组功能。协同功能可以分别迭代两个组的元素并返回任意数量的结果元素。

与 Reduce,GroupReduce 和 Join 相似,可以使用不同的键选择方法来定义键。

val iVals: DataSet[(String, Int)] = // [...]
val dVals: DataSet[(String, Double)] = // [...]

val output = iVals.coGroup(dVals).where(0).equalTo(0) {
  (iVals, dVals, out: Collector[Double]) =>
    val ints = iVals map { _._2 } toSet

    for (dVal <- dVals) {
      for (i <- ints) {
        out.collect(dVal._2 * i)

# 4.23 Union

产生两个必须具有相同类型的数据集的并集。可以通过多个联合调用实现两个以上 DataSet 的联合

val vals1: DataSet[(String, Int)] = // [...]
val vals2: DataSet[(String, Int)] = // [...]
val vals3: DataSet[(String, Int)] = // [...]

val unioned = vals1.union(vals2).union(vals3)

# 4.24 Rebalance(重新平衡)

重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。

val in: DataSet[String] = // [...]
// rebalance DataSet and apply a Map transformation.
val out = in.rebalance().map { ... }

# 4.25 Hash-Partition(哈希分区)

重要:此操作会通过网络重新整理整个 DataSet。可能会花费大量时间。


# 4.26 Range-Partition(范围分区)

重要:此操作需要在 DataSet 上额外传递一次以计算范围,通过网络对整个 DataSet 进行边界划分和改组。这会花费大量时间。


# 4.27 Sort Partition(分区排序)

以指定顺序对指定字段上的 DataSet 的所有分区进行本地排序。可以将字段指定为字段表达式或字段位置。
可以通过链接 sortPartition() 调用在多个字段上对分区进行排序。

    .sortPartition(_.dataUnix, Order.ASCENDING)
    .sortPartition(_.uid, Order.DESCENDING)
    .mapPartition((values: lang.Iterable[RolePay], out: Collector[(String, Int, Double)]) => {
      values.iterator().asScala.foreach(o => out.collect(o.rid, o.dataUnix, o.money))

# 4.28 First-n(前 n 个(任意)元素)

返回数据集的前 n 个(任意)元素。First-n 可以应用于常规数据集,分组的数据集或分组排序的数据集。可以将分组键指定为键选择器功能或字段位置键。

val in: DataSet[(String, Int)] = // [...]
// Return the first five (arbitrary) elements of the DataSet
val out1 = in.first(5)

// Return the first two (arbitrary) elements of each String group
val out2 = in.groupBy(0).first(2)

// Return the first three elements of each String group ordered by the Integer field
val out3 = in.groupBy(0).sortGroup(1, Order.ASCENDING).first(3)
最后修改时间: 2/17/2020, 4:43:04 AM