精品专区-精品自拍9-精品自拍三级乱伦-精品自拍视频-精品自拍视频曝光-精品自拍小视频

網站建設資訊

NEWS

網站建設資訊

DataSet數據集在使用sql()時,無法使用map,flatMap等轉換算子的解決辦法

摘要

我們提供的服務有:網站設計、成都網站設計、微信公眾號開發、網站優化、網站認證、綏芬河ssl等。為1000+企事業單位解決了網站和推廣的問題。提供周到的售前咨詢和貼心的售后服務,是有科學管理、有技術的綏芬河網站制作公司

我們在使用spark的一個流程是:利用spark.sql()函數把數據讀入到內存形成DataSet[Row](DataFrame)由于Row是新的spark數據集中無法實現自動的編碼,需要對這個數據集進行編碼,才能利用這些算子進行相關的操作,如何編碼是一個問題,在這里就把這幾個問題進行總結一下。報的錯誤:error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

報這個錯誤一般就是我們在使用算子時其返回值的數據類型往往不是spark通過自身的反射能完成的自動編碼部分,比如通過map算子,我們在map算子的函數的返回值類型是Map類型的,就會出現上面的問題,因為Map集合類不在:基本的類型和String,case class和元組的范圍之內,spark內部不能通過反射完成自動編碼。

出現這個問題的原因

spark2.0以后的版本采用的是新的分布式數據集DataSet,其中DataFrame是DataSet[Row]的別名形式。而新的數據集采用了很多的優化,其中一個就是利用了Tungsten execution engine的計算引擎,這個計算引擎采用了很多的優化。其中一個就是自己維護了一個內存管理器,從而使計算從java jvm解脫出來了,使得內存的優化得到了很大的提升。同時新的計算引擎,把數據存儲在內存中是以二進制的形式存儲的,大部分所有的計算都是在二進制數據流上進行的,不需要把二進制數據流反序列化成java對象,然后再把計算的結果序列化成二進制數據流,而是直接在二進制流上進行操作,這樣的情況就需要我們存在一種機制就是java對象到二進制數據流的映射關系,不然我們不知道二進制流對應的數據對象是幾個字節,spark這個過程是通過Encoders來完成的,spark自身通過反射完成了一部分的自動編碼過程:基本的類型和String,case class和元組,對于其他的集合類型或者我們自定義的類,他是無法完成這樣的編碼的。需要我們自己定義這樣的編碼也就是讓其擁有一個schema。

DataSet數據集在使用sql()時,無法使用map,flatMap等轉換算子的解決辦法

解決這個問題方式

方法一:

這樣就是把其轉化為RDD,利用RDD進行操作,但是不建議用這個,相對于RDD,DataSet進行了很多的底層優化,擁有很不錯性能


val orderInfo1 = spark.sql(

 """

   |SELECT

   |o.id,

   |o.user_id

   |FROM default.api_order o

   |limit 100

 """.stripMargin).rdd.map(myfunction)



方法二:

讓其自動把DataSet[Row]轉化為DataSet[P],如果Row里面有復雜的類型出現的話。


case class Orders(id: String, user_id: String)

//這個case class要定義在我們的單例對象的外面

object a {

def main(args: Array[String]): Unit ={

import spark.implicits._

val orderInfo1 = spark.sql(

 """

   |SELECT

   |o.id,

   |o.user_id

   |FROM default.api_order o

   |limit 100

 """.stripMargin).as[Orders].map(myfunction)

}

}



方式三:

自定義一個schema,然后利用RowEncoder進行編碼。這只是一個例子,里面的類型其實都可以通過spark的反射自動完成編碼過程。


import spark.implicits._

val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true))))

val encoders = RowEncoder(schema)

val orderInfo1 = spark.sql(

 """

   |SELECT

   |o.id,

   |o.user_id

   |FROM default.api_order o

   |limit 100

 """.stripMargin).map(row => row)(encoders)


方法四:

直接利用scala的模式匹配的策略case Row來進行是可以通過的,原因是case Row()scala模式匹配的知識,這樣可以知道集合Row里面擁有多少個基本的類型,則可以通過scala就可以完成對Row的自動編碼,然后可以進行相應的處理。


import spark.implicits._

val orderInfo1 = spark.sql(

 """

   |SELECT

   |o.id,

   |o.user_id

   |FROM default.api_order o

   |limit 100

 """.stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)}

這個得到的schema為:

orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string]

如果換成這樣:

val orderInfo1 = spark.sql(

 """

   |SELECT

   |o.id,

   |o.user_id

   |FROM default.api_order o

   |limit 100

 """.stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)}

得到的schema為:

orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array]

可以看出:spark是把元祖看成case class一種特殊形式擁有,schame的字段名稱為_1,_2這樣的特殊case clase



網站名稱:DataSet數據集在使用sql()時,無法使用map,flatMap等轉換算子的解決辦法
URL網址:http://m.jcarcd.cn/article/ppgihc.html
主站蜘蛛池模板: 91视频国产地址 | 欧美与黑人 | 午夜免费福利体验 | 97超级碰碰碰 | 女同精品一区二区 | 日韩精品欧美 | 日本免费三片在 | 日韩一级性生活 | 91成人短视频在线 | 欧美日韩在线一区 | 绿帽视频网站 | 国产偷窥不卡视频 | 欧洲亚洲精 | 九色蝌蚪首页 | 日本妇人 | 国产一区欧美二区 | 欧美日韩国产在线 | 日韩欧美亚 | 日本高清色 | 日本成人一区 | 国产亚洲精品aa | 91综合网。 | 国产精品丝袜高跟鞋 | 欧美系列国产系列一 | 韩国视频一 | 国内国外精品一区二 | 日韩精品a人综合 | 无码精品人妻一区二区三区影院 | 国产日本精品视频 | 成人一级免费激情网 | 理论精品电影 | 国产91影院| 欧美午夜理伦 | 99这里只有精品 | 精品免费在线观看 | 飘花在线影院 | 日本在线看免费 | 日本三级国产在线 | 91污污 | 无码h成年动漫在线观看 | 97超级碰碰碰电影 |