IDEA+MAVEN开发Spark 词频统计
需要添加的依赖
<repositories>
<repository>
<id>cloudera</id>
<name>cloudera</name>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0-cdh5.7.0</version>
</dependency>
</dependencies>
IDEA开发wordcount
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val textFile = sc.textFile(args(0))
val wc = textFile.flatMap(line=>line.split("\t")).map((_,1)).reduceByKey(_+_)
wc.collect().foreach(println)
sc.stop()
}
}
package生成jar包,rz命令传到云主机
提交作业,用hdfs上的数据文件
spark-submit \
--class com.ruozedata.bigdata.spark01.WordCount \
--master local[2] \
/home/hadoop/lib/g6_spark-1.0.jar \
hdfs://hadoop001:9000/data
19/05/02 22:02:03 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 7 ms
19/05/02 22:02:03 INFO Executor: Finished task 0.0 in stage 1.0 (TID 2). 1267 bytes result sent to driver
19/05/02 22:02:03 INFO Executor: Finished task 1.0 in stage 1.0 (TID 3). 1240 bytes result sent to driver
19/05/02 22:02:03 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 54 ms on localhost (executor driver) (1/2)
19/05/02 22:02:03 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 3) in 53 ms on localhost (executor driver) (2/2)
19/05/02 22:02:03 INFO DAGScheduler: ResultStage 1 (collect at WordCount.scala:17) finished in 0.050 s
19/05/02 22:02:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/05/02 22:02:03 INFO DAGScheduler: Job 0 finished: collect at WordCount.scala:17, took 0.817481 s
(hello,3)
(eurecom,2)
(yuan,1)
19/05/02 22:02:03 INFO SparkUI: Stopped Spark web UI at http://10.9.61.124:4040
19/05/02 22:02:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/05/02 22:02:03 INFO MemoryStore: MemoryStore cleared
19/05/02 22:02:03 INFO BlockManager: BlockManager stopped
19/05/02 22:02:03 INFO BlockManagerMaster: BlockManagerMaster stopped
改进1
把运行结果保存到指定的文件中去
package com.ruozedata.bigdata.spark01
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val textFile = sc.textFile(args(0))
val wc = textFile.flatMap(line=>line.split("\t")).map((_,1)).reduceByKey(_+_)
wc.saveAsTextFile(args(1))
sc.stop()
}
}
spark-submit \
--class com.ruozedata.bigdata.spark01.WordCount \
--master local[2] \
/home/hadoop/lib/g6_spark-1.0.jar \
hdfs://hadoop001:9000/data hdfs://hadoop001:9000/wc_output
[hadoop@hadoop001 ~]$ hadoop fs -ls /wc_output
Found 3 items
-rw-r--r-- 3 hadoop supergroup 0 2019-05-02 22:11 /wc_output/_SUCCESS
-rw-r--r-- 3 hadoop supergroup 22 2019-05-02 22:11 /wc_output/part-00000
-rw-r--r-- 3 hadoop supergroup 9 2019-05-02 22:11 /wc_output/part-00001
[hadoop@hadoop001 ~]$ hadoop fs -text /wc_output/part*
19/05/02 22:14:31 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
19/05/02 22:14:31 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev f1deea9a313f4017dd5323cb8bbb3732c1aaccc5]
(hello,3)
(eurecom,2)
(yuan,1)
改进2
按次数排序的词频统计
sortBykey()默认是升序,sortBykey(false)是降序
package com.ruozedata.bigdata.spark01
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
val sc = new SparkContext(sparkConf)
val textFile = sc.textFile(args(0))
val wc = textFile.flatMap(line=>line.split("\t")).map((_,1)).reduceByKey(_+_)
val sorted = wc.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1))
sorted.saveAsTextFile(args(1))
sc.stop()
}
}