sparkstreaming中动态广播变量的使用

在实际的项目中,我们一般都会把配置信息放在配置文件或者存到第三方存储中,然后在程序中去读取,但是有的时候我们想修改这些信息,修改完必须要重启job才能生效,那是不是太麻烦了,那有没有办法修改完不重启job就能生效呢?其实我们可以用sparkstreaming的动态广播变量,比如某个配置需要十分钟更新一次,那我们可以在driver端初始化这个变量,在excetors端获取这个变量(注意excetors端只能读取,不能修改),所以可以在driver端每隔10分钟更新一次广播变量,然后广播到每一个excetors上去,具体的代码实现如下所示:

package test import java.sql.{Connection, DriverManager, ResultSet, Statement}import java.text.SimpleDateFormatimport java.util.{Date, Properties}import kafka._import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.serialization.{StringDeserializer}import org.apache.log4j.{Level, Logger}import org.apache.spark.broadcast.Broadcastimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}import org.apache.spark.{SparkConf, SparkContext} object test3 { @volatile private var instance: Broadcast[Map[String, Double]] = null var kafkaStreams: InputDStream[ConsumerRecord[String, String]] = null val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS") def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.INFO) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.INFO) Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.INFO) val conf = new SparkConf().setAppName("Spark Streaming TO ES TOPIC") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") @transient val scc = new StreamingContext(conf, Seconds(1)) val topic = PropertiesScalaUtils.loadProperties("topic_combine") val topicSet = Set(topic) //设置kafka的topic; val kafkaParams = Map[String, Object]( "auto.offset.reset" -> "earliest", //latest;earliest "value.deserializer" -> classOf[StringDeserializer] //key,value的反序列化; , "key.deserializer" -> classOf[StringDeserializer] , "bootstrap.servers" -> PropertiesScalaUtils.loadProperties("broker") , "group.id" -> PropertiesScalaUtils.loadProperties("groupId_es") , "enable.auto.commit" -> (false: java.lang.Boolean) ) //初始化instance; getInstance(scc.sparkContext) kafkaStreams = KafkaUtils.createDirectStream[String, String]( scc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicSet, kafkaParams)) kafkaStreams.foreachRDD(rdd => { val current_time = sdf.format(new Date()) val new_time = current_time.substring(14,16).toLong if(new_time % 5 == 0){ update(rdd.sparkContext,true) //五分钟更新一次广播变量的内容; } if (!rdd.isEmpty()) { val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //获得偏移量对象数组 rdd.foreachPartition(pr => { pr.foreach(pair => { val d = pair.value() if(instance.value.contains(d)){ //自己的处理逻辑; } }) }) } }) scc.start() scc.awaitTermination() } /** * 从sqlserver获取数据放到一个map里; * @return */ def getSqlServerData(): Map[String,Double] = { val time = sdf.format(new Date()) val enter_time = time.substring(0,10) var map = Map[String,Double]() var conn:Connection = null var stmt:Statement = null var rs:ResultSet = null val url = "" val user_name = "" val password = "" val sql = "" try { conn = DriverManager.getConnection(url,user_name,password) stmt = conn.createStatement rs = stmt.executeQuery(sql) while (rs.next) { val url = rs.getString("url") val WarningPrice = rs.getString("WarningPrice").toDouble map += (url -> WarningPrice) } if (rs != null) { rs.close rs = null } if (stmt != null) { stmt.close stmt = null } if (conn != null) { conn.close conn = null } } catch { case e: Exception => e.printStackTrace() println("sqlserver连接失败:" + e) } map } /** * 更新instance; * @param sc * @param blocking */ def update(sc: SparkContext, blocking: Boolean = false): Unit = { if (instance != null){ instance.unpersist(blocking) instance = sc.broadcast(getSqlServerData()) } } /** * 初始化instance; * @param sc * @return */ def getInstance(sc: SparkContext): Broadcast[Map[String,Double]] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.broadcast(getSqlServerData()) } } } instance }}

这里是从sqlserver获取的数据,广播到每一个excetors上,每隔五分钟更新一次,广播到excetors上面.

注意:广播变量是一个只读变量,在driver端定义后,在excetors端只能读它的值,不能修改

更多spark和flink的内容,可以加入下面的星球

声明:文中观点不代表本站立场。本文传送门:http://eyangzhen.com/247110.html

(0)
联系我们
联系我们
分享本页
返回顶部