专栏原创出处:github-源笔记文件 (opens new window) ,github-源码 (opens new window),欢迎 Star,转载请附上原文出处链接和本声明。
# DataSet分配唯一标识符
在某些算法中,可能需要为数据集元素分配唯一标识符。[[org.apache.flink.api.scala.utils.DataSetUtils]] scala 包装类
具体实现源码可参考 DataSetUtils (opens new window)
# zipWithIndex 方式分配
为元素分配连续的标签,接收数据集作为输入并返回 DataSet[(Long, T)] 2 元组的新数据集。
此过程需要两步操作,首先是计数,然后是标记元素,由于计数同步,因此无法进行流水线处理。
替代方法 zipWithUniqueId 以流水线方式工作,当唯一的标签足够时,它是首选方法。
# zipWithUniqueId 方式分配
在许多情况下,可能不需要分配连续的标签。
zipWithUniqueId 以管道方式工作,加快了标签分配过程。
此方法接收一个数据集作为输入,并返回一个新的 DataSet[(Long, T)] 2 元组数据集
代码示例 ZippingElements (opens new window) :
import io.gourd.flink.scala.api.BatchExecutionEnvironmentApp
/** 在某些算法中,可能需要为数据集元素分配唯一标识符。
* 本文档说明了如何将
* [[org.apache.flink.api.scala.utils.DataSetUtils]]
* [[org.apache.flink.api.java.utils.DataSetUtils.zipWithIndex()]]
* [[org.apache.flink.api.java.utils.DataSetUtils.zipWithUniqueId()]]
* 用于此目的。
*
* @author Li.Wei by 2019/11/12
*/
object ZippingElements extends BatchExecutionEnvironmentApp {
import org.apache.flink.api.scala._
val input: DataSet[String] = bEnv.fromElements("A", "B", "C", "D", "E", "F", "G", "H")
bEnv.setParallelism(2)
/*
zipWithIndex 为元素分配连续的标签,接收数据集作为输入并返回 DataSet[(Long, T)] 2 元组的新数据集。
此过程需要两步操作,首先是计数,然后是标记元素,由于计数同步,因此无法进行流水线处理。
替代方法 zipWithUniqueId 以流水线方式工作,当唯一的标签足够时,它是首选方法。
*/
import org.apache.flink.api.scala.utils.DataSetUtils
input.zipWithIndex.print()
/*
(0,A)
(1,B)
(2,C)
(3,D)
(4,E)
(5,F)
(6,G)
(7,H)
*/
println()
/*
在许多情况下,可能不需要分配连续的标签。
zipWithUniqueId 以管道方式工作,加快了标签分配过程。
此方法接收一个数据集作为输入,并返回一个新的 DataSet[(Long, T)] 2 元组数据集
本机执行,未发生并行,实际情况参考分布式测试结果
*/
input.zipWithUniqueId.print()
/*
(0,A)
(1,B)
(2,C)
(3,D)
(4,E)
(5,F)
(6,G)
(7,H)
*/
}
← DataSet概览 DataSet参数传递 →