spark中使用groupByKey進行分組排序的示例代碼
任務需求:已知RDD[(query:String, item_id:String, imp:Int, clk:Int)],要求找到每個query對應的點擊最多的前2個item_id,即:按照query分組,并按照clk降序排序,每組取前兩個。
(相關資料圖)
例如:
(連衣裙,1234, 22, 13)
(牛仔褲,2768, 34, 7)
(連衣裙,1673,45, 9)
(襯衣,3468, 67, 12)
(牛仔褲,2754, 68, 20)
(連衣裙,1976,93, 29)
希望得到:
(連衣裙,1976,93, 29)
(連衣裙,1234, 22, 13)
(牛仔褲,2754, 68, 20)
(牛仔褲,2768, 34, 7)
(襯衣,3468, 67, 12)
先看一個錯誤的版本:
val list = List(("連衣裙",1234, 22, 13),("牛仔褲",2768, 34, 7),("連衣裙",1673,45, 9) ,("襯衣",3468,67, 12),("牛仔褲",2754, 68, 20),("連衣裙",1976,93, 29)) val rdd = ss.sparkContext.parallelize(list) val topItem_set= rdd.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey() .map(line => { val topItem = line._2.toArray.sortBy(_._3)(Ordering[Int].reverse).take(2) topItem.mkString(",") topItem.map(x => {(line._1, x._1, x._2, x._3)}) }) topItem_set.foreach(println) println() topItem_set.map(_.mkString).foreach(println)
我們把query作為key,其余放到一起,groupByKey后(map之前),類型為:RDD[(String, Iterable[(String, Int, Int)])],根據(jù)query分組再map,line._2.toArray把Iterable轉為Array,sortBy(_._3)是按最后一個Int即clk排序,(Ordering[Int].reverse)表示從大到?。╯ortBy默認從小到大,注意這里的sortBy是Array的成員函數(shù)而不是rdd的sortBy,用法比較不同),take(2)是取前2個,然后返回(query, item_id)。跑一下上面的過程。
返回:
[Lscala.Tuple4;@2b672e4 [Lscala.Tuple4;@52e50126 [Lscala.Tuple4;@1362b124 (連衣裙,1976,93,29)(連衣裙,1234,22,13) (襯衣,3468,67,12) (牛仔褲,2754,68,20)(牛仔褲,2768,34,7)
上面3行是直接打印跟預期稍有差別,同一個key下的top兩個元素是作為一個整體,但已經(jīng)很接近目標,如果希望拆分,需要使用flatMap:
val topItem_set= rdd.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey() .flatMap(line => { val topItem = line._2.toArray.sortBy(_._3)(Ordering[Int].reverse).take(2) topItem.mkString(",") topItem.map(x => {(line._1, x._1, x._2, x._3)}) })
為什么呢?GroupByKey后,類型為RDD[(String, Iterable[(String, Int, Int)])],如果用map,那每一個key對應的一個Iterable變量,相當于一條數(shù)據(jù),map后的結果自然還是一條。但flatMap,相當于map+flat操作,這才是我們真正的需要的形式。
任務進階:要求找到每個query對應的點擊最多的前2個item_id,當點擊一樣時,選曝光最少的,即:按照query分組,并優(yōu)先按照clk降序排序,其次按照imp升序排序,每組取前兩個。
例如:
(連衣裙,1234, 22, 13)
(牛仔褲,2768, 34, 7)
(連衣裙,1673,45, 9)
(襯衣,3468, 67, 12)
(牛仔褲,2754, 68, 20)
(連衣裙,1976,93, 29)
(牛仔褲,1232, 20, 7)
希望得到:
(連衣裙,1976,93, 29)
(連衣裙,1234, 22, 13)
(牛仔褲,2754, 68, 20)
(牛仔褲,1232, 20, 7)
(襯衣,2768, 34, 7)
注意,上面樣本中牛仔褲有兩個樣本的點擊都是7,但標紅的樣本曝光數(shù)是更小,所以應該入選top2,直接上代碼吧:
val list2 = List(("連衣裙",1234, 22, 13),("牛仔褲",2768, 34, 7),("連衣裙",1673,45, 9) ,("襯衣",3468,67, 12),("牛仔褲",2754, 68, 20),("連衣裙",1976,93, 29),("牛仔褲",1232, 20, 7)) val rdd2 = ss.sparkContext.parallelize(list2) rdd2.foreach(println) val topItem_set= rdd2.map(ele => (ele._1, (ele._2, ele._3, ele._4))).groupByKey() .flatMap(line => { val topItem = line._2.toArray.sortBy(x => (x._3, x._2))(Ordering.Tuple2(Ordering[Int].reverse, Ordering[Int])).take(2) topItem.map(x => {(line._1, x._1, x._2, x._3)}) }) topItem_set.foreach(println)
sortBy可以根據(jù)需要增加排序維度,參數(shù)按優(yōu)先級排列,這個在日常使用較多。
到此這篇關于spark中使用groupByKey進行分組排序的文章就介紹到這了,更多相關spark使用groupByKey分組排序內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
標簽:
為您推薦
-
“請大家閉上眼睛,把注意力從頭頂‘漫游’到眉頭……”8月底,南部戰(zhàn)區(qū)陸軍某旅駐島某海防連課室內,官...
2021-09-18
-
辦房本要找“黃牛”、應急管理部門涉嫌搞壟斷、行業(yè)協(xié)會以辦理車輛登記上牌備案之名借機收費斂財、中小...
2021-09-18
-
人民網(wǎng)北京9月17日電 (記者溫璐、宋子節(jié))今日,國務院新聞辦就扎實做好民政在全面小康中的兜底夯基工...
2021-09-18
-
人民網(wǎng)杭州9月17日電 (記者孫博洋)9月16日至17日,中國質量(杭州)大會在浙江杭州舉行。在16日舉行...
2021-09-18
-
人民網(wǎng)北京9月17日電 (記者王連香)據(jù)交通運輸部消息,全國網(wǎng)約車監(jiān)管信息交互平臺統(tǒng)計,截至2021年8...
2021-09-18
-
新聞發(fā)布會現(xiàn)場。海關總署供圖人民網(wǎng)北京9月17日電 (記者栗翹楚)9月16日,海關總署舉行新聞發(fā)布會,...
2021-09-18
-
人民網(wǎng)杭州9月17日電 (記者孫博洋)9月16日至17日,中國質量(杭州)大會在浙江杭州舉行。在16日舉行...
2021-09-18