欧美成人午夜免费全部完,亚洲午夜福利精品久久,а√最新版在线天堂,另类亚洲综合区图片小说区,亚洲欧美日韩精品色xxx

千鋒扣丁學(xué)堂大數(shù)據(jù)培訓(xùn)之Spark的Broadcast單例模式

2019-06-18 14:03:39 2184瀏覽

今天千鋒扣丁學(xué)堂大數(shù)據(jù)培訓(xùn)老師給大家分享一篇關(guān)于Spark的Broadcast要用單例模式詳解,首先很多用SparkStreaming的朋友應(yīng)該使用過(guò)broadcast,大多數(shù)情況下廣播變量都是以單例模式聲明的有沒(méi)有粉絲想過(guò)為什么?浪尖在這里幫大家分析一下,有以下幾個(gè)原因:



廣播變量大多數(shù)情況下是不會(huì)變更的,使用單例模式可以減少sparkstreaming每次job生成執(zhí)行,重復(fù)生成廣播變量帶來(lái)的開(kāi)銷。

單例模式也要做同步。這個(gè)對(duì)于很多新手來(lái)說(shuō)可以不用考慮同步問(wèn)題,原因很簡(jiǎn)單因?yàn)樾率植粫?huì)調(diào)整spark程序task的調(diào)度模式,而默認(rèn)采用FIFO的調(diào)度模式,基本不會(huì)產(chǎn)生并發(fā)問(wèn)題。1).假如你配置了Fair調(diào)度模式,同時(shí)修改了SparkStreaming運(yùn)行的并行執(zhí)行的job數(shù),默認(rèn)為1,那么就要加上同步代碼了。2).還有一個(gè)原因,在多輸出流的情況下共享broadcast,同時(shí)配置了Fair調(diào)度模式,也會(huì)產(chǎn)生并發(fā)問(wèn)題。

注意。有些時(shí)候比如廣播配置文件,規(guī)則等需要變更broadcast,在使用fair的時(shí)候可以在foreachrdd里面使用局部變量作為廣播,避免相互干擾。

先看例子,后面逐步揭曉內(nèi)部機(jī)制。

1.例子

下面是一個(gè)雙重檢查式的broadcast變量的聲明方式。

object WordBlacklist { 
 
  @volatile private var instance: Broadcast[Seq[String]] = null 
 
  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { 
    if (instance == null) { 
      synchronized { 
        if (instance == null) { 
          val wordBlacklist = Seq("a", "b", "c") 
          instance = sc.broadcast(wordBlacklist) 
        } 
      } 
    } 
    instance 
  } 
} 

廣播變量的使用方法如下:

val lines = ssc.socketTextStream(ip, port) 
    val words = lines.flatMap(_.split(" ")) 
    val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) => 
      // Get or register the blacklist Broadcast 
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext) 
      // Get or register the droppedWordsCounter Accumulator 
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) 
      // Use blacklist to drop words and use droppedWordsCounter to count them 
      val counts = rdd.filter { case (word, count) => 
        if (blacklist.value.contains(word)) { 
          droppedWordsCounter.add(count) 
          false 
        } else { 
          true 
        } 
      }.collect().mkString("[", ", ", "]") 
      val output = s"Counts at time $time $counts" 
      println(output) 
      println(s"Dropped ${droppedWordsCounter.value} word(s) totally") 
      println(s"Appending to ${outputFile.getAbsolutePath}") 
      Files.append(output + "\n", outputFile, Charset.defaultCharset()) 
    } 

2.概念補(bǔ)充

為啥Spark的Broadcast要用單例模式

首先,一個(gè)基本概念就是Spark應(yīng)用程序從開(kāi)始提交到task執(zhí)行分了很多層。

應(yīng)用調(diào)度器。主要是資源管理器,比如standalone,yarn等負(fù)責(zé)Spark整個(gè)應(yīng)用的調(diào)度和集群資源的管理。

job調(diào)度器。spark的算子分為主要兩大類,transform和action,其中每一個(gè)action都會(huì)產(chǎn)生一個(gè)job。這個(gè)job需要在executor提供的資源池里調(diào)度執(zhí)行,當(dāng)然并不少直接調(diào)度執(zhí)行job。

stage劃分及調(diào)度。job具體會(huì)劃分為若干stage,這個(gè)就有一個(gè)基本的概念就是寬依賴和窄依賴,寬依賴就會(huì)劃分stage。stage也需要調(diào)度執(zhí)行,從后往前劃分,從前往后調(diào)度執(zhí)行。

task切割及調(diào)度。stage往下繼續(xù)細(xì)化就是會(huì)根據(jù)不太的并行度劃分出task集合,這個(gè)就是在executor上調(diào)度執(zhí)行的基本單元,目前的調(diào)度默認(rèn)是一個(gè)task一個(gè)cpu。

SparkStreaming的job生成是周期性的。當(dāng)前job的執(zhí)行時(shí)間超過(guò)生成周期就會(huì)產(chǎn)生job累加。累加一定數(shù)目的job后有可能會(huì)導(dǎo)致應(yīng)用程序失敗。這個(gè)主要原因是由于FIFO的調(diào)度模式和SparkStreaming的默認(rèn)單線程的job執(zhí)行機(jī)制

3.SparkStreamingjob生成

這個(gè)源碼主要入口是StreamingContext#JobScheduler#JobGenerator對(duì)象,內(nèi)部有個(gè)RecurringTimer,主要負(fù)責(zé)按照批處理時(shí)間周期產(chǎn)生GenrateJobs事件,當(dāng)然在存在windows的情況下,該周期有可能不會(huì)生成job,要取決于滑動(dòng)間隔,有興趣自己去揭秘,浪尖星球里分享的視頻教程里講到了。具體代碼塊如下

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 
   longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") 

我們直接看其實(shí)現(xiàn)代碼塊:

eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 
 
      override protected def onError(e: Throwable): Unit = { 
        jobScheduler.reportError("Error in job generator", e) 
      } 
    } 
    eventLoop.start() 

event處理函數(shù)是processEvent方法

/** Processes all events */ 
  private def processEvent(event: JobGeneratorEvent) { 
    logDebug("Got event " + event) 
    event match { 
      case GenerateJobs(time) => generateJobs(time) 
      case ClearMetadata(time) => clearMetadata(time) 
      case DoCheckpoint(time, clearCheckpointDataLater) => 
        doCheckpoint(time, clearCheckpointDataLater) 
      case ClearCheckpointData(time) => clearCheckpointData(time) 
    } 
  } 

在接受到GenerateJob事件的時(shí)候,會(huì)執(zhí)行g(shù)enerateJobs代碼,就是在該代碼內(nèi)部產(chǎn)生和調(diào)度job的。

/** Generate jobs and perform checkpointing for the given `time`.  */ 
  private def generateJobs(time: Time) { 
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are 
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847). 
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") 
    Try { 
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch 
      graph.generateJobs(time) // generate jobs using allocated block 
    } match { 
      case Success(jobs) => 
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
      case Failure(e) => 
        jobScheduler.reportError("Error generating jobs for time " + time, e) 
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e) 
    } 
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 
  } 

可以看到代碼里首先會(huì)執(zhí)行job生成代碼

graph.generateJobs(time) 
 
具體代碼塊兒 
 
def generateJobs(time: Time): Seq[Job] = { 
    logDebug("Generating jobs for time " + time) 
    val jobs = this.synchronized { 
      outputStreams.flatMap { outputStream => 
        val jobOption = outputStream.generateJob(time) 
        jobOption.foreach(_.setCallSite(outputStream.creationSite)) 
        jobOption 
      } 
    } 
    logDebug("Generated " + jobs.length + " jobs for time " + time) 
    jobs 
  } 

每個(gè)輸出流都會(huì)生成一個(gè)job,輸出流就類似于foreachrdd,print這些。其實(shí)內(nèi)部都是ForEachDStream。所以生成的是一個(gè)job集合。

然后就會(huì)將job集合提交到線程池里去執(zhí)行,這些都是在driver端完成的哦。

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 
 
具體h函數(shù)內(nèi)容 
def submitJobSet(jobSet: JobSet) { 
    if (jobSet.jobs.isEmpty) { 
      logInfo("No jobs added for time " + jobSet.time) 
    } else { 
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) 
      jobSets.put(jobSet.time, jobSet) 
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) 
      logInfo("Added jobs for time " + jobSet.time) 
    } 
  } 

其實(shí)就是遍歷生成的job集合,然后提交到線程池jobExecutor內(nèi)部執(zhí)行。這個(gè)也是在driver端的哦。

jobExecutor就是一個(gè)固定線程數(shù)的線程池,默認(rèn)是1個(gè)線程。

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 
  private val jobExecutor = 
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") 

需要的話可以配置spark.streaming.concurrentJobs來(lái)同時(shí)提交執(zhí)行多個(gè)job。

那么這種情況下,job就可以并行執(zhí)行了嗎?

顯然不是的!

還要修改一下調(diào)度模式為Fair,詳細(xì)的配置可以參考:

http://spark.apache.org/docs/2.3.3/job-scheduling.html#scheduling-within-an-application

簡(jiǎn)單的均分的話只需要

conf.set("spark.scheduler.mode", "FAIR") 

然后,同時(shí)運(yùn)行的job就會(huì)均分所有executor提供的資源。

這就是整個(gè)job生成的整個(gè)過(guò)程了哦。

因?yàn)镾parkStreaming的任務(wù)存在Fair模式下并發(fā)的情況,所以需要在使用單例模式生成broadcast的時(shí)候要注意聲明同步。

以上就是關(guān)于千鋒扣丁學(xué)堂大數(shù)據(jù)培訓(xùn)之Spark的Broadcast單例模式的全部?jī)?nèi)容,想要了解更多關(guān)于大數(shù)據(jù)開(kāi)發(fā)方面內(nèi)容的小伙伴,請(qǐng)關(guān)注扣丁學(xué)堂大數(shù)據(jù)培訓(xùn)官網(wǎng)、微信等平臺(tái),扣丁學(xué)堂IT職業(yè)在線學(xué)習(xí)教育有專業(yè)的大數(shù)據(jù)講師為您指導(dǎo),此外扣丁學(xué)堂老師精心推出的大數(shù)據(jù)視頻教程定能讓你快速掌握大數(shù)據(jù)從入門(mén)到精通開(kāi)發(fā)實(shí)戰(zhàn)技能??鄱W(xué)堂大數(shù)據(jù)學(xué)習(xí)群:209080834。


扣丁學(xué)堂微信公眾號(hào)                          Python全棧開(kāi)發(fā)爬蟲(chóng)人工智能機(jī)器學(xué)習(xí)數(shù)據(jù)分析免費(fèi)公開(kāi)課直播間


      【關(guān)注微信公眾號(hào)獲取更多學(xué)習(xí)資料】         【掃碼進(jìn)入Python全棧開(kāi)發(fā)免費(fèi)公開(kāi)課】



查看更多關(guān)于“大數(shù)據(jù)培訓(xùn)資訊”的相關(guān)文章>

標(biāo)簽: 大數(shù)據(jù)培訓(xùn) 大數(shù)據(jù)視頻教程 大數(shù)據(jù)分析培訓(xùn) 大數(shù)據(jù)學(xué)習(xí)視頻 Hadoop生態(tài)圈

熱門(mén)專區(qū)

暫無(wú)熱門(mén)資訊

課程推薦

微信
微博
15311698296

全國(guó)免費(fèi)咨詢熱線

郵箱:codingke@1000phone.com

官方群:148715490

北京千鋒互聯(lián)科技有限公司版權(quán)所有   北京市海淀區(qū)寶盛北里西區(qū)28號(hào)中關(guān)村智誠(chéng)科創(chuàng)大廈4層
京ICP備2021002079號(hào)-2   Copyright ? 2017 - 2022
返回頂部 返回頂部