Spark-分组TOPN算法

该数据集都为:“http://bigdata.edu360.cn/laozhou” 这个样子,需求是找到每个学科下最受欢迎的老师
方法一:

/**
  * 数据放到scala 集合里面进行操作
  */
object GroupFavTeacher_1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("FavTeacher").setMaster("local")
    val sc = new SparkContext(conf)
    //指定以后从哪里读取数据
    val lines = sc.textFile(args(0))
    //整理数据
    val subject_teacherAndOne = lines.map(line => {
      //val line = "http://bigdata.edu360.cn/laoyu"
      val conSubject = line.split("/")(2)
      val subject =conSubject.split("[.]")(0)
      val teacher = line.split("/")(3)
      ((subject, teacher),1)
    })
    //聚合,将学科和老师联合当做key
    val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_)
    //分组排序(按学科进行分组)
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
    //经过分组后,一个分区内可能有多个学科的数据,一个学科就是一个迭代器
    //将每一个组拿出来进行操作v
    //为什么可以调用sacla的sortby方法呢?因为一个学科的数据已经在一个scala集合里面了
    val sorted = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(3))
    val resulted = sorted.collect()
    //收集
    println(resulted.toBuffer)
    sc.stop()
  }
}

  • 方法二:
/**
  * 先过滤再统计计算
  */
object GroupFavTeacher_2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GroupFavTeacher_2").setMaster("local")
    val sc = new SparkContext(conf)
    //val topN = args(1).toInt
    val subjects = Array("bigdata", "javaee", "php")
    //指定以后从哪里读取数据
    val lines = sc.textFile(args(0))
    //整理数据
    val subject_teacherAndOne = lines.map(line => {
      //val line = "http://bigdata.edu360.cn/laozhang"
      val conSubject = line.split("/")(2)
      val subject =conSubject.split("[.]")(0)
      val teacher = line.split("/")(3)
      ((subject, teacher),1)
    })
    //聚合,将学科和老师联合当做key
    val reduced: RDD[((String,String), Int)] = subject_teacherAndOne.reduceByKey(_+_)
    //分组排序(按学科进行分组)
    val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)

    //scala的集合排序是在内存中进行的,但是内存有可能不够用
    //可以调用RDD的sortby方法,内存+磁盘进行排序
    for(sb <- subjects) {
      //该RDD中对应的数据仅有一个学科的数据(因为过滤过了)
      val filted = grouped.filter(_._1 == sb)
      //现在调用的是RDD的sortBy方法,(take是一个action,会触发任务提交)
      val filtedResulted = filted.sortBy(_._2, false).take(3)
      println(filtedResulted.toBuffer)
    }
    sc.stop()
  }
}
  • 方法三:
/**
*自定义分区器(k,v)
*
/
object GroupFavTeacher3 {

  def main(args: Array[String]): Unit = {

    val topN = args(1).toInt

    val conf = new SparkConf().setAppName("GroupFavTeacher2").setMaster("local[4]")
    val sc = new SparkContext(conf)

    //指定以后从哪里读取数据
    val lines: RDD[String] = sc.textFile(args(0))
    //整理数据
    val sbjectTeacherAndOne: RDD[((String, String), Int)] = lines.map(line => {
      val index = line.lastIndexOf("/")
      val teacher = line.substring(index + 1)
      val httpHost = line.substring(0, index)
      val subject = new URL(httpHost).getHost.split("[.]")(0)
      ((subject, teacher), 1)
    })

    //聚合,将学科和老师联合当做key
    val reduced: RDD[((String, String), Int)] = sbjectTeacherAndOne.reduceByKey(_+_)

    //计算有多少学科
    val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()

    //自定义一个分区器,并且按照指定的分区器进行分区
    val sbPatitioner = new SubjectParitioner(subjects);

    //partitionBy按照指定的分区规则进行分区
    //调用partitionBy时RDD的Key是(String, String)
    val partitioned: RDD[((String, String), Int)] = reduced.partitionBy(sbPatitioner)

    //一次拿出一个分区(可以操作一个分区中的数据了)
    val sorted: RDD[((String, String), Int)] = partitioned.mapPartitions(it => {
      //将迭代器转换成list,然后排序,在转换成迭代器返回
      it.toList.sortBy(_._2).reverse.take(topN).iterator
    })

    //
    val r: Array[((String, String), Int)] = sorted.collect()
    println(r.toBuffer)
    sc.stop()
  }
}

//自定义分区器
//思想就是把每一种给一个编号,每一个编号下的分区都是该学科的数据
class SubjectParitioner(sbs: Array[String]) extends Partitioner {

  //相当于主构造器(new的时候会执行一次)
  //用于存放规则的一个map
  val rules = new mutable.HashMap[String, Int]()
  var i = 0
  for(sb <- sbs) {
    //rules(sb) = i
    rules.put(sb, i)
    i += 1
  }

  //返回分区的数量(下一个RDD有多少分区)
  override def numPartitions: Int = sbs.length

  //根据传入的key计算分区标号
  //key是一个元组(String, String)
  override def getPartition(key: Any): Int = {
    //获取学科名称
    val subject = key.asInstanceOf[(String, String)]._1
    //根据规则计算分区编号
    rules(subject)
  }
}

全部评论

相关推荐

04-02 10:09
门头沟学院 Java
用微笑面对困难:这里面问题还是很多的,我也不清楚为啥大家会感觉没啥问题。首先就是全栈开发实习9个月的内容都没有java实习生的内容多,1整个技术栈没看出太核心和难点的内容,感觉好像被拉过去打杂了,而且全栈基本上很容易被毙。里面能问的bug是在太多了比如L:继承 BaseMapper 可直接使用内置方法’。请问你的 BaseMapper 是如何扫描实体类注解如果瞬时产生 100 个上传任务,MySQL 的索引设计是否会有瓶颈?你做过分库分表或者索引优化吗?全栈的内容可以针对动态难点去搞,技能特长写在下面吧,你写了这么多技能,项目和实习体现了多少?你可以在项目里多做文章然后把这个放下去,从大致来看实习不算太水,有含金量你也要写上内容针对哨兵里面的节点变化能问出一万个问题,这个很容易就爆了。
提前批简历挂麻了怎么办
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务