Spark Streaming实时流处理项目11——综合实战
Spark Streaming实时流处理项目1——分布式日志收集框架Flume的学习
Spark Streaming实时流处理项目2——分布式消息队列Kafka学习
Spark Streaming实时流处理项目3——整合Flume和Kafka完成实时数据采集
Spark Streaming实时流处理项目4——实战环境搭建
Spark Streaming实时流处理项目5——Spark Streaming入门
Spark Streaming实时流处理项目6——Spark Streaming实战1
Spark Streaming实时流处理项目7——Spark Streaming实战2
Spark Streaming实时流处理项目8——Spark Streaming与Flume的整合
Spark Streaming实时流处理项目9——Spark Streaming整合Kafka实战
Spark Streaming实时流处理项目10——日志产生器开发并结合log4j完成日志的输出
Spark Streaming实时流处理项目11——综合实战
源码
1、使用Python脚本模拟现实中的网站实时产生数据:
# coding=UTF-8
import random
import time
url_paths = [
"class/112.html",
"class/128.html",
"class/145.html",
"class/146.html",
"class/131.html",
"class/130.html",
"learn/821",
"course/list"
]
ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]
http_referers = [
"http://www.baidu.com/s?wd={query}",
"https://www.sogou.com/web?query={query}",
"http://cn.bing.com/search?q={query}",
"https://search.yahoo.com/search?p={query}"
]
search_keyword = [
"Spark SQL实战",
"Hadoop基础",
"Storm实战",
"Spark Streaming实战",
"大数据面试"
]
status_codes = ["200","404","500"]
def sample_url():
return random.sample(url_paths,1)[0]
def sample_ip():
slice = random.sample(ip_slices,4)
return ".".join([str(item) for item in slice])
def sample_referer():
if random.uniform(0,1) > 0.2:
return "-"
refer_str = random.sample(http_referers,1)
query_str = random.sample(search_keyword,1)
return refer_str[0].format(query=query_str[0])
def sample_status_code():
return random.sample(status_codes,1)[0]
def generate_log(count = 10):
time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
f = open("/root/DataSet/access.log","w+")
while count >= 1:
query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\" \t{status_code}\t{referer}".format(url=sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
print query_log
f.write(query_log + "\n")
count = count-1
if __name__ == '__main__':
generate_log(100)
每次需要手动执行,不太符合实际生产的情形,下面我们借助crontab表达式,让Python脚本每一分钟产生一批数据。
Linux
* * * * * *
- - - - - -
| | | | | |
| | | | | + year [optional]
| | | | +----- day of week (0 - 7) (Sunday=0 or 7)
| | | +---------- month (1 - 12)
| | +--------------- day of month (1 - 31)
| +-------------------- hour (0 - 23)
+------------------------- min (0 - 59)
Java(Spring)
* * * * * * *
- - - - - - -
| | | | | | |
| | | | | | + year [optional]
| | | | | +----- day of week (0 - 7) (Sunday=0 or 7)
| | | | +---------- month (1 - 12)
| | | +--------------- day of month (1 - 31)
| | +-------------------- hour (0 - 23)
| +------------------------- min (0 - 59)
+------------------------------ second (0 - 59)
每一分钟执行一次的crontab表达式是: */1 * * * *
写一个脚本log_generator.sh,里面就放一句话:python /root/Project/generate_log.py ,
给这个脚本可执行权限:chmod u+x log_generator.sh
执行log_generator.sh这个脚本就可以执行Python程序生产日志啦。
每隔一分钟执行一次log_generator.sh脚本的crontab表达式这么写的: */1 * * * * /root/Project/log_generator.sh
我们先使用命令:crontab -e ,然后把*/1 * * * * /root/Project/log_generator.sh复制进去,保存退出,就可以执行啦!
2、对接Python日志产生器输出的日志到Flume
编写Flume配置文件streaming_project.conf:
exec-memory-logger.sources=exec-source
exec-memory-logger.sinks=logger-sink
exec-memory-logger.channels=memory-channel
exec-memory-logger.sources.exec-source.type=exec
exec-memory-logger.sources.exec-source.command=tail -F /root/DataSet/access.log
exec-memory-logger.sources.exec-source.shell=/bin/sh -c
exec-memory-logger.channels.memory-channel.type=memory
exec-memory-logger.sinks.logger-sink.type=logger
exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.logger-sink.channel=memory-channel
启动命令:flume-ng agent -n exec-memory-logger -c /soft/flume1.6/conf/ -f /soft/flume1.6/conf/streaming_project.conf -Dflume.root.logger=INFO,console
观察控制台,打印如下输出,说明对接成功:
2019-02-26 11:08:06,713 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 33 32 2E 33 30 2E 38 37 2E 36 33 09 32 30 31 132.30.87.63.201 }
2019-02-26 11:08:06,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 33 30 2E 31 33 32 2E 31 36 37 2E 37 32 09 32 30 30.132.167.72.20 }
2019-02-26 11:08:06,714 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 34 2E 31 35 36 2E 36 33 2E 35 35 09 32 30 124.156.63.55.20 }
3、修改Flume配置文件,使得flume sink数据到kafka中:
streaming_project2.conf
exec-memory-logger.sources=exec-source
exec-memory-logger.sinks=kafka-sink
exec-memory-logger.channels=memory-channel
exec-memory-logger.sources.exec-source.type=exec
exec-memory-logger.sources.exec-source.command=tail -F /root/DataSet/access.log
exec-memory-logger.sources.exec-source.shell=/bin/sh -c
exec-memory-logger.channels.memory-channel.type=memory
exec-memory-logger.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
exec-memory-logger.sinks.kafka-sink.brokerList=hadoop0:9092,hadoop1:9092,hadoop2:9092,hadoop3:9092
exec-memory-logger.sinks.kafka-sink.topic=streamingtopic
exec-memory-logger.sinks.kafka-sink.batchSize=5
exec-memory-logger.sinks.kafka-sink.requiredAcks=1
exec-memory-logger.sources.exec-source.channels=memory-channel
exec-memory-logger.sinks.kafka-sink.channel=memory-channel
在另一台机器启动一个kafka消费者:./kafka-console-consumer.sh --zookeeper hadoop0:2181 --topic streamingtopic,然后再启动flume:flume-ng agent -n exec-memory-logger -c /soft/flume1.6/conf/ -f /soft/flume1.6/conf/streaming_project2.conf -Dflume.root.logger=INFO,console
观察kafka消费者打印出如下数据说明整合成功:
187.168.10.167 2019-02-26 11:24:01 "GET /class/112.html HTTP/1.1" - 200
156.10.124.29 2019-02-26 11:24:01 "GET /class/131.html HTTP/1.1" - 500
63.46.29.187 2019-02-26 11:24:01 "GET /class/145.html HTTP/1.1" - 200
87.10.124.167 2019-02-26 11:24:01 "GET /class/112.html HTTP/1.1" - 500
63.29.72.132 2019-02-26 11:24:01 "GET /class/112.html HTTP/1.1" - 500
30.63.124.98 2019-02-26 11:24:01 "GET /class/128.html HTTP/1.1" https://search.yahoo.com/search?p=Hadoop基础 200
4、下一步就是SparkStreaming和kafka的对接了
打通Flume&Kafka&SparkStreaming整条线路;并在Spark应用程序中接收kafka数据完成记录数统计。
编写SparkStreaming应用程序:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author YuZhansheng
* @desc 使用SparkStreaming处理Kafka过来的数据
* @create 2019-02-26 11:40
*/
object ImoocStatStreamingApp {
def main(args: Array[String]): Unit = {
//判断参数个数是否为4,不为4则退出运行
if(args.length != 4){
println("Usage:ImoocStatStreamingApp <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum,groupId,topics,numThreads) = args
val sparkConf = new SparkConf().setAppName("ImoocStatStreamingApp").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(60))
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val messages = KafkaUtils.createStream(ssc,zkQuorum,groupId,topicMap)
//测试步骤一:测试数据接收
messages.map(_._2).count().print
ssc.start()
ssc.awaitTermination()
}
}
测试,参数在IDEA里面输入:
控制台打印输出如下信息说明整合成功:
-------------------------------------------
Time: 1551165180000 ms
-------------------------------------------
100
5、数据清洗:从原始日志中取出我们需要的字段信息
新建一个工具类DataUtils:
import java.util.Date
import org.apache.commons.lang3.time.FastDateFormat
/**
* @author YuZhansheng
* @desc 日期时间工具类
* @create 2019-02-26 15:22
*/
object DateUtils {
val YYYYMMDDHHMMSS_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
val TARGE_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")
def getTime(time:String) = {
YYYYMMDDHHMMSS_FORMAT.parse(time).getTime
}
def parseToMinute(time:String) = {
TARGE_FORMAT.format(new Date(getTime(time)))
}
def main(args: Array[String]): Unit = {
println(parseToMinute("2019-02-26 15:22:01"))
}
}
在新建一个ClickLog的domain对象:
/**
* @author YuZhansheng
* @desc 清洗后的日志信息
* @param日志访问的ip地址
* @param日志访问的时间
* @param日志访问的实战课程编号
* @param日志访问的状态码
* @param日志访问的referer
* @create 2019-02-26 15:43
*/
case class ClickLog(ip:String, time:String, courseId:Int, statusCode:Int, referer:String)
在上面的SparkStreaming程序中追加如下红色字体的程序,实现数据的清洗:
import com.xidian.spark.project.domain.ClickLog
import com.xidian.spark.project.utils.DateUtils
//测试步骤一:测试数据接收
//messages.map(_._2).count().print
//测试步骤二:数据清洗
val logs = messages.map(_._2)
val cleanData = logs.map(line => {
val infos = line.split("\t")
//infos(2) = "GET /class/112.html HTTP/1.1"
//url = /class/112.html
val url = infos(2).split(" ")(1)
var courseId = 0
//获取以/class开头的课程的编号
if (url.startsWith("/class")){
val courseIdHTML = url.split("/")(2)
courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
}
ClickLog(infos(0),DateUtils.parseToMinute(infos(1)),courseId,infos(3).toInt,infos(4))
}).filter(clicklog => clicklog.courseId != 0)
cleanData.print()
运行程序观察控制台输出如下信息,说明数据清洗功能实现:
-------------------------------------------
Time: 1551169320000 ms
-------------------------------------------
ClickLog(187.156.167.30,20190226162102,130,404,-)
ClickLog(72.87.46.124,20190226162102,128,200,https://www.sogou.com/web?query=Hadoop基础)
ClickLog(30.168.46.187,20190226162102,146,500,-)
ClickLog(72.143.168.63,20190226162102,128,200,-)
ClickLog(168.30.132.98,20190226162102,145,404,-)
ClickLog(132.29.143.10,20190226162102,146,200,-)
..........
6、需求:统计到今天为止的实战课程(/class开头的课程)的访问量
分析:实现这个需求我们需要使用数据库来存储我们的统计结果,使用SparkStreaming把统计结果写入到数据库里面,根据
yyyyMMdd courseid 把数据库里面的统计结果展示出来。
选择什么数据库作为统计结果的存储呢?
RDBMS:MySQL、Oracle......
NoSQL:HBase、Redis........
我们选择HBase!不解释。O(∩_∩)O
先启动HBase: ./start-hbase.sh
启动HBase Shell:./hbase shell
使用list命令,查看所有表。
但是,出现了如下的错误:
hbase(main):004:0* list
TABLE
ERROR: org.apache.hadoop.hbase.PleaseHoldException: Master is initializing
at org.apache.hadoop.hbase.master.HMaster.checkInitialized(HMaster.java:2293)
at org.apache.hadoop.hbase.master.MasterRpcServices.getTableNames(MasterRpcServices.java:900)
at org.apache.hadoop.hbase.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java:55650)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2180)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
at java.lang.Thread.run(Thread.java:748)
网上找了很多解决办法,比如:
将hbase-site.xml文件中的hbase.rootdir进行修改,改为以下的内容:
<property>
<name>hbase.rootdir</name>
<value>hdfs://redhat6:9000/hbase</value>
</property>
但没有解决我的问题!在评论区看到一个方法:
输入 date 查看各个节点时间是否同步,若不同步(我自己出现的百分之八十基本都是不同步的问题),可输入 date -s "2019-02-26 14:18"(时间自己定) 各个集群节点上时间同步后,重启hbase, 问题解决!!
创建一张HBase表:create 'imooc_course_clickcount','info'
Rowkey设计:day_courseid
下一步实现Scala来操作HBase:
先定义一个实体类:
/**
* @author YuZhansheng
* @desc 实战课程访问数
* day_course:对应的是HBase中的rowkey,格式:20190227_1
* click_count:对应的是20190227_1这一天该课程的访问数
* @create 2019-02-26 19:19
*/
case class CourseClickCount (day_course:String,click_count:Long)
对数据库的访问还需要在建立一个dao层,新建一个package,新建一个object,CourseClickCountDAO
还需要再建一个工具类HBaseUtils.java,这是一个Java类,需要放在Java包下,该类具体实现对HBase的操作。两个类如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
/**
* @author YuZhansheng
* @desc HBase操作工具类,Java工具类建议采用单例模式封装
* @create 2019-02-27 10:11
*/
public class HBaseUtils {
HBaseAdmin admin = null;
Configuration configuration = null;
//单例模式需要私有构造方法
private HBaseUtils(){
configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum","hadoop0,hadoop1,hadoop2,hadoop3");
configuration.set("hbase.rootdir","hdfs://hadoop0:9000/hbase");
try {
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
//懒汉式单例模式
private static HBaseUtils instance = null;
public static synchronized HBaseUtils getInstance(){
if (null == instance){
instance = new HBaseUtils();
}
return instance;
}
//根据表名获取到HTable实例
public HTable getTable(String tableName){
HTable table = null;
try {
table = new HTable(configuration, tableName);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/**
* 添加一条记录到HBase表
* @param tableName HBase表名
* @param rowkey HBase表的rowkey
* @param cf HBase表的columnfamily
* @param column HBase表的列
* @param value 写入HBase表的值
*/
public void put(String tableName,String rowkey,String cf,String column,String value){
HTable table = getTable(tableName);
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
//测试数据,使用时将这个主函数注释掉
public static void main(String[] args) {
//HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
//System.out.println(table.getName().getNameAsString());
String tableName = "imooc_course_clickcount" ;
String rowkey = "20190111_88";
String cf = "info" ;
String column = "click_count";
String value = "2";
HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
}
}
package com.xidian.spark.project.dao
import com.xidian.spark.project.domain.CourseClickCount
import com.xidian.spark.project.utils.HBaseUtils
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
/**
* @author YuZhansheng
* @desc 实战课程点击数的数据访问层
* @create 2019-02-27 9:51
*/
object CourseClickCountDAO {
val tableName = "imooc_course_clickcount"
val cf = "info"
val qualifer = "click_count"
//保存数据到HBase
def save(list:ListBuffer[CourseClickCount]):Unit = {
val table = HBaseUtils.getInstance().getTable(tableName)
for (ele <- list){
table.incrementColumnValue(Bytes.toBytes(ele.day_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count
)
}
}
//根据rowkey查询值
def count(day_course:String):Long = {
val table = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_course))
val value = table.get(get).getValue(cf.getBytes,qualifer.getBytes)
if (value == null){
0l
}else{
Bytes.toLong(value)
}
}
//测试程序是否可用
def main(args: Array[String]): Unit = {
val list = new ListBuffer[CourseClickCount]
list.append(CourseClickCount("20190227_8",8))
list.append(CourseClickCount("20190227_9",18))
list.append(CourseClickCount("20190227_1",12))
save(list)
}
}
测试:运行CourseClickCountDAO 的主函数(测试函数),在HBase shell控制台使用scan 'imooc_course_clickcount'命令,查看控制台输出:
hbase(main):005:0> scan 'imooc_course_clickcount'
ROW COLUMN+CELL
20190211_88 column=info:click_count, timestamp=1551235301383, value=9
20190227_1 column=info:click_count, timestamp=1551236147161, value=\x00\x00\x00\x00\x00\x00\x00\x0C
20190227_8 column=info:click_count, timestamp=1551236147119, value=\x00\x00\x00\x00\x00\x00\x00\x08
20190227_9 column=info:click_count, timestamp=1551236147150, value=\x00\x00\x00\x00\x00\x00\x00\x12
4 row(s) in 0.0650 seconds
说明数据插入成功。
最后一步,补充SparkStreaming程序,完成需求:统计到今天为止的实战课程(/class开头的课程)的访问量,并写入到HBase数据库中去。将下面程序补充进ImoocStatStreamingApp中,启动crontab产生日志,启动Flume,启动kafka,HBase,运行程序,观察HBase中的数据增加情况。
//cleanData.print()
//测试步骤三:统计到今天为止,实战课程(以/class开头)的访问量
cleanData.map(x => {
//HBase rowkey设计:20190226_8
(x.time.substring(0,8) + "_" + x.courseId,1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val list = new ListBuffer[CourseClickCount]
partitionRecords.foreach(pair => {
list.append(CourseClickCount(pair._1,pair._2))
})
CourseClickCountDAO.save(list)
})
})
hbase(main):006:0> scan 'imooc_course_clickcount'
ROW COLUMN+CELL
20190211_88 column=info:click_count, timestamp=1551235301383, value=9
20190226_112 column=info:click_count, timestamp=1551238136748, value=\x00\x00\x00\x00\x00\x00\x00
20190226_128 column=info:click_count, timestamp=1551238136477, value=\x00\x00\x00\x00\x00\x00\x00\x18
20190226_130 column=info:click_count, timestamp=1551238136755, value=\x00\x00\x00\x00\x00\x00\x00\x14
20190226_131 column=info:click_count, timestamp=1551238136488, value=\x00\x00\x00\x00\x00\x00\x00\x1C
20190226_145 column=info:click_count, timestamp=1551238136767, value=\x00\x00\x00\x00\x00\x00\x00\x19
20190226_146 column=info:click_count, timestamp=1551238136512, value=\x00\x00\x00\x00\x00\x00\x00\x16
20190227_1 column=info:click_count, timestamp=1551236147161, value=\x00\x00\x00\x00\x00\x00\x00\x0C
该需求实现。
6、需求:统计从搜索引擎过来的实战课程点击数
创建一个新的HBase表:create 'imooc_course_search_clickcount','info'
rowkey设计:根据我们的业务需求设计成如下形式:20190227+search+1
然后再新建一个实体类CourseSearchClickCount:
/**
* 从搜索引擎过来的实战课程点击数实体类
* @param day_search_course
* @param click_count
*/
case class CourseSearchClickCount(day_search_course:String, click_count:Long)
数据访问层DAO:
import com.xidian.spark.project.domain.CourseSearchClickCount
import com.xidian.spark.project.utils.HBaseUtils
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
/**
* 从搜索引擎过来的实战课程点击数-数据访问层
*/
object CourseSearchClickCountDAO {
val tableName = "imooc_course_search_clickcount"
val cf = "info"
val qualifer = "click_count"
/**
* 保存数据到HBase
* @param list CourseSearchClickCount集合
*/
def save(list: ListBuffer[CourseSearchClickCount]): Unit = {
val table = HBaseUtils.getInstance().getTable(tableName)
for(ele <- list) {
table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
Bytes.toBytes(cf),
Bytes.toBytes(qualifer),
ele.click_count)
}
}
/**
* 根据rowkey查询值
*/
def count(day_search_course: String):Long = {
val table = HBaseUtils.getInstance().getTable(tableName)
val get = new Get(Bytes.toBytes(day_search_course))
val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
if(value == null) {
0L
}else{
Bytes.toLong(value)
}
}
//
// def main(args: Array[String]): Unit = {
//
// //测试可用否
// val list = new ListBuffer[CourseSearchClickCount]
// list.append(CourseSearchClickCount("20190227_www.baidu.com_8",8))
// list.append(CourseSearchClickCount("20190227_cn.bing.com_9",9))
//
// save(list)
//
// println(count("20190227_www.baidu.com_8") + " : " + count("20190227_cn.bing.com_9"))
// }
}
最后一步,修改SparkStreaming程序,添加如下程序,完成统计功能:
//测试步骤四:统计从搜索引擎过来的今天到现在为止实战课程的访问量
cleanData.map(x => {
// 转换:https://www.sogou.com/web?query=Spark SQL实战==>https:/www.sogou.com/web?query=Spark SQL实战
val referer = x.referer.replaceAll("//", "/")
val splits = referer.split("/")
var host = ""
if(splits.length > 2) {
host = splits(1)
}
(host, x.courseId, x.time)
}).filter(_._1 != "").map(x => {
(x._3.substring(0,8) + "_" + x._1 + "_" + x._2 , 1)
}).reduceByKey(_ + _).foreachRDD(rdd => {
rdd.foreachPartition(partitionRecords => {
val list = new ListBuffer[CourseSearchClickCount]
partitionRecords.foreach(pair => {
list.append(CourseSearchClickCount(pair._1, pair._2))
})
CourseSearchClickCountDAO.save(list)
})
})