精品专区-精品自拍9-精品自拍三级乱伦-精品自拍视频-精品自拍视频曝光-精品自拍小视频

網站建設資訊

NEWS

網站建設資訊

怎么實現SparkStreaming轉化操作

怎么實現SparkStreaming轉化操作,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。

成都創新互聯長期為1000+客戶提供的網站建設服務,團隊從業經驗10年,關注不同地域、不同群體,并針對不同對象提供差異化的產品和服務;打造開放共贏平臺,與合作伙伴共同營造健康的互聯網生態環境。為南平企業提供專業的網站制作、網站建設南平網站改版等技術服務。擁有10年豐富建站經驗和眾多成功案例,為您定制開發。

DStream的轉化操作分為無狀態有狀態兩種

  • 在無狀態轉化操作中,每個批次的處理不依賴于之前批次的數據。

  • 有狀態轉化操作需要使用之前批次的數據或者中間結果來計算當前批次的數據,有狀態轉化操作包括基于滑動窗口的轉化操作和追蹤狀態變化的轉換操作。

無狀態轉化


無狀態轉化操作的實質就說把簡單的RDD轉化操作應用到每個批次上,也就是轉化DStream的每一個RDD

Transform算子

Transform 允許 DStream 上執行任意的 RDD-to-RDD 函數。即使這些函數并沒有在 DStream 的 API 中暴露出來,通過該函數可以方便的擴展 Spark API。該函數每一批次調度一次。其實也 就是對 DStream 中的 RDD 應用轉換。

def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("transform").setMaster("local[*]")
    val sc: StreamingContext = new StreamingContext(conf, Seconds(3))
    val lines = sc.socketTextStream("localhost", 9999)

    // transform方法可以將底層RDD獲取到后進行操作
    // 1. DStream功能不完善
    // 2. 需要代碼周期性的執行

    // Code : Driver端
    val newDS: DStream[String] = lines.transform(
      rdd => {
        // Code : Driver端,(周期性執行)
        rdd.map(
          str => {
            // Code : Executor端
            str
          }
        )
      }
    )
    // Code : Driver端
    val newDS1: DStream[String] = lines.map(
      data => {
        // Code : Executor端
        data
      }
    )
    sc.start()
    sc.awaitTermination()
  }

join算子

兩個流之間的 join 需要兩個流的批次大小一致,這樣才能做到同時觸發計算。計算過程就是對當前批次的兩個流中各自的 RDD 進行 join,與兩個 RDD 的 join 效果相同。

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val data9999 = ssc.socketTextStream("localhost", 9999)
    val data8888 = ssc.socketTextStream("localhost", 8888)

    val map9999: DStream[(String, Int)] = data9999.map((_,9))
    val map8888: DStream[(String, Int)] = data8888.map((_,8))

    // 所謂的DStream的Join操作,其實就是兩個RDD的join
    val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)

    joinDS.print()

    ssc.start()
    ssc.awaitTermination()
}

有狀態轉化


有狀態轉化操作是跨時間區間跟蹤數據的操作,也就是說,一些先前批次的數據也被用來在新的批次中用于計算結果。有狀態轉換的主要的兩種類型:

  • 滑動窗口:以一個時間階段為滑動窗口進行操作

  • updateStateByKey():通過key值來跟蹤數據的狀態變化

有狀態轉化操作需要在StreamingContext中打開檢查點機制來提高容錯

updateStateByKey

  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("updateStateByKey")
    val sc: StreamingContext = new StreamingContext(conf, Seconds(4))
    sc.checkpoint("cp")
    val ds: ReceiverInputDStream[String] = sc.socketTextStream("localhost", 9999)

    val value: DStream[(String, Int)] = ds.map(((_: String), 1))


    // updateStateByKey:根據key對數據的狀態進行更新
    // 傳遞的參數中含有兩個值
    // 第一個值表示相同的key的value數據的集合
    // 第二個值表示緩存區key對應的計算值
    val state: DStream[(String, Int)] = value.updateStateByKey((seq: Seq[Int], option: Option[Int]) => {
      val newCount: Int = option.getOrElse(0) + seq.sum
      Option(newCount)
    })

    state.print()

    sc.start()
    sc.awaitTermination()

  }

窗口

所有基于窗口的函數都需要兩個參數,分別對應窗口時長滑動步長,并且兩者都必須是SparkStreaming的批次間隔的整數倍。
窗口時長控制的是每次用來計算的批次的個數
滑動步長用于控制對新的DStream進行計算的間隔

怎么實現SparkStreaming轉化操作

window操作

基于window進行窗口內元素計數操作

def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6), Seconds(6))

    val wordToCount = windowDS.reduceByKey(_+_)

    wordToCount.print()

    ssc.start()
    ssc.awaitTermination()
  }
reduce操作

有逆操作規約是一種更高效的規約操作,通過只考慮新進入窗口的元素和離開窗口的元素,讓spark增量計算歸約的結果,其在代碼上的體現就是reduceFuncinvReduceFunc

怎么實現SparkStreaming轉化操作

普通歸約操作

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)

    lines.reduceByWindow(
      (x: String, y: String) => {
        x + "-" + y
      },
      Seconds(9), Seconds(3)
    ).print()

    ssc.start()
    ssc.awaitTermination()
  }

有逆歸約操作

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)
    val wordToOne = lines.map((_,1))

    /**
     * 基于窗口進行有逆歸約:通過控制窗口流出和進入的元素來提高性能
     */
    val windowDS: DStream[(String, Int)] =
    wordToOne.reduceByKeyAndWindow(
      (x:Int, y:Int) => { x + y},
      (x:Int, y:Int) => {x - y},
      Seconds(9), Seconds(3))

    windowDS.print()

    ssc.start()
    ssc.awaitTermination()
  }
count操作
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    ssc.checkpoint("cp")

    val lines = ssc.socketTextStream("localhost", 9999)

    /**
     * 統計窗口中輸入數據的個數
     * 比如 3s內輸入了10條數據,則打印10
     */
    val countByWindow: DStream[Long] = lines.countByWindow(
      Seconds(9), Seconds(3)
    )
    countByWindow.print()

    /**
     * 統計窗口中每個值的個數
     * 比如 3s內輸入了1個3 2個4 3個5,則打印(3,1)(2,4)(3,5)
     */
    val countByValueAndWindow: DStream[(String, Long)] = lines.countByValueAndWindow(
      Seconds(9), Seconds(3)
    )
    countByValueAndWindow.print()

    ssc.start()
    ssc.awaitTermination()
  }

看完上述內容,你們掌握怎么實現SparkStreaming轉化操作的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注創新互聯行業資訊頻道,感謝各位的閱讀!


網站名稱:怎么實現SparkStreaming轉化操作
文章起源:http://m.jcarcd.cn/article/poegod.html
主站蜘蛛池模板: 日韩伦理在线播放成 | 日本成人中文字幕 | 天美传媒在线观看果 | 日韩欧美一区黑 | 国产精品三三级在线 | 日韩国产在线视频 | 另类人兽第一页 | 日韩欧美午夜视频 | 国精品午夜福 | 欧美在线观看不卡 | 国产精品视频网 | 欧美一级日韩一级 | 精品无人区 | 女同69互| 成人区一区 | 福利影院在线看 | 日韩97在线 | 绿帽在线| 97视频在线播放 | 午夜91| 国产情侣自拍小视频 | 日韩国产网曝 | 日本三级网站 | 国产欧美高 | 国产一区二区自拍 | 精品国产人成亚洲区 | 日韩欧美中文在线 | 欧美亚洲日本国产 | 不卡无在一区 | 精品在线免费播放 | 91人人人人伦理片 | 国产日产欧产精品 | 国产美女91 | 日本在线成人短视频 | 精品尤物视频 | 国产香蕉尹人视频在 | 国产日产高| 日韩a级一片 | 国内自拍亚洲 | 欧美整片sss | 午夜一区 |