专栏原创出处:github-源笔记文件 (opens new window)github-源码 (opens new window),欢迎 Star,转载请附上原文出处链接和本声明。 本节内容对应官方文档 (opens new window),本节内容对应示例源码 (opens new window)

# DataSet广播变量

重要信息:一台计算机上的并行任务之间共享广播变量数据结构。修改其内部状态的任何访问都需要由调用者手动同步

示例代码:

/** 广播变量
  *
  * @author Li.Wei by 2019/11/4
  */
object Broadcast extends BatchExecutionEnvironmentApp {

  // 用户登录数据 DataSet
  val userLoginDs = DataSet.userLogin(this)

  // 角色登录数据 DataSet 对应用户 ID,去重
  val roleLoginDs = DataSet.roleLogin(this).map(_.uid).distinct()

  userLoginDs
    .map(new MyBroadcastMap())
    .withBroadcastSet(roleLoginDs, "roleLoginDataSet") // 将数据集作为广播集
    .first(10).withForwardedFields()
    .print()
    
  /* print
  (none,LOGOUT)
  (2|2946,LOGIN)
  (0|1082,LOGOUT)
  (2|2892,LOGOUT)
  (none,LOGIN)
  (2|1835,LOGIN)
  (none,LOGOUT)
  (none,LOGOUT)
  (0|489,LOGOUT)
  (none,LOGOUT)
   */
}

/**
  * 自定义 map 实现函数,[[RichMapFunction]] 中可获取 flink 上下文及执行前后的打开关闭操作
  */
class MyBroadcastMap extends RichMapFunction[UserLogin, (String, String)] {
  var broadcastSet: Traversable[String] = _ // 声明广播变量

  override def open(config: Configuration): Unit = {
    // 赋值广播变量
    import scala.collection.JavaConverters._
    broadcastSet = getRuntimeContext.getBroadcastVariable[String]("roleLoginDataSet").asScala
  }

  // 判断当前用户对应的 ID 在该用户对应角色中是否登录过
  override def map(value: UserLogin): (String, String) =
    if (broadcastSet.exists(_ == value.uid)) (value.uid, value.status) else ("none", value.status)

}
最后修改时间: 12/18/2019, 8:41:00 AM