IDEA+MAVEN开发Spark 词频统计

需要添加的依赖

    <repositories>
        <repository>
            <id>cloudera</id>
            <name>cloudera</name>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    
 <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-cdh5.7.0</version>
        </dependency>
  </dependencies>

IDEA开发wordcount

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    val textFile = sc.textFile(args(0))
    val wc = textFile.flatMap(line=>line.split("\t")).map((_,1)).reduceByKey(_+_)
    wc.collect().foreach(println)
    sc.stop()
  }

}

package生成jar包,rz命令传到云主机

提交作业,用hdfs上的数据文件

spark-submit \
--class com.ruozedata.bigdata.spark01.WordCount  \
--master local[2] \
/home/hadoop/lib/g6_spark-1.0.jar   \
hdfs://hadoop001:9000/data


19/05/02 22:02:03 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
19/05/02 22:02:03 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1267 bytes result sent to driver
19/05/02 22:02:03 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1240 bytes result sent to driver
19/05/02 22:02:03 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 54 ms on localhost (executor driver) (1/2)
19/05/02 22:02:03 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 53 ms on localhost (executor driver) (2/2)
19/05/02 22:02:03 INFO DAGScheduler: ResultStage 1 (collect at WordCount.scala:17) finished in 0.050 s
19/05/02 22:02:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
19/05/02 22:02:03 INFO DAGScheduler: Job 0 finished: collect at WordCount.scala:17, took 0.817481 s
(hello,3)
(eurecom,2)
(yuan,1)
19/05/02 22:02:03 INFO SparkUI: Stopped Spark web UI at http://10.9.61.124:4040
19/05/02 22:02:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/02 22:02:03 INFO MemoryStore: MemoryStore cleared
19/05/02 22:02:03 INFO BlockManager: BlockManager stopped
19/05/02 22:02:03 INFO BlockManagerMaster: BlockManagerMaster stopped

改进1
把运行结果保存到指定的文件中去

package com.ruozedata.bigdata.spark01
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    val textFile = sc.textFile(args(0))
    val wc = textFile.flatMap(line=>line.split("\t")).map((_,1)).reduceByKey(_+_)
    wc.saveAsTextFile(args(1))
    sc.stop()
  }
}
spark-submit \
--class com.ruozedata.bigdata.spark01.WordCount  \
--master local[2] \
/home/hadoop/lib/g6_spark-1.0.jar   \
hdfs://hadoop001:9000/data  hdfs://hadoop001:9000/wc_output

[hadoop@hadoop001 ~]$ hadoop fs -ls /wc_output
Found 3 items
-rw-r--r--   3 hadoop supergroup          0 2019-05-02 22:11 /wc_output/_SUCCESS
-rw-r--r--   3 hadoop supergroup         22 2019-05-02 22:11 /wc_output/part-00000
-rw-r--r--   3 hadoop supergroup          9 2019-05-02 22:11 /wc_output/part-00001

[hadoop@hadoop001 ~]$ hadoop fs -text /wc_output/part*
19/05/02 22:14:31 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
19/05/02 22:14:31 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
(hello,3)
(eurecom,2)
(yuan,1)

改进2
按次数排序的词频统计
sortBykey()默认是升序,sortBykey(false)是降序

package com.ruozedata.bigdata.spark01

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)

    val textFile = sc.textFile(args(0))
    val wc = textFile.flatMap(line=>line.split("\t")).map((_,1)).reduceByKey(_+_)
    val sorted = wc.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1))
    sorted.saveAsTextFile(args(1))
    sc.stop()
  }
}
全部评论

相关推荐

某物流公司 软件开发岗 总包26-30
点赞 评论 收藏
转发
点赞 收藏 评论
分享
牛客网
牛客企业服务