【大数据面试题】Spark-Streaming

1-Spark Streaming第一次运行不丢失数据

kafka参数 auto.offset.reset 参数设置成earliest 从最初始偏移量开始消费数据

2-Spark Streaming精准一次消费

  1. 手动维护偏移量
  2. 处理完业务数据后,再进行提交偏移量操作

极端情况下,如在提交偏移量时断网或停电会造成spark程序第二次启动时重复消费问题,所以在涉及到金额或精确性非常高的场景会使用事物保证精准一次消费

3-Spark Streaming控制每秒消费数据的速度

通过spark.streaming.kafka.maxRatePerPartition参数来设置Spark Streaming从kafka分区每秒拉取的条数

4-Spark Streaming背压机制

Spark Streaming 反压机制是1.5版本推出的特性,用来解决处理速度比摄入速度慢的情况,简单来讲就是做流量控制。当批处理时间(Batch Processing Time)大于批次间隔(Batch Interval,即 BatchDuration)时,说明处理数据的速度小于数据摄入的速度,持续时间过长或源头数据暴增,容易造成数据在内存中堆积,最终导致Executor OOM。

在这种情况下,若是基于Kafka Receiver的数据源,可以通过设置spark.streaming.receiver.maxRate来控制最大输入速率;

若是基于Direct的数据源(如Kafka Direct Stream),则可以通过设置spark.streaming.kafka.maxRatePerPartition来控制最大输入速率。

当然,在事先经过压测,且流量高峰不会超过预期的情况下,设置这些参数一般没什么问题。但最大值,不代表是最优值,最好还能根据每个批次处理情况来动态预估下个批次最优速率。在Spark 1.5.0以上,就可通过背压机制来实现。开启反压机制,即设置spark.streaming.backpressure.enabled为true,Spark Streaming会自动根据处理能力来调整输入速率,从而在流量高峰时仍能保证最大的吞吐和性能。

---Spark Streaming的反压机制中,有以下几个重要的组件:

  • RateControllerRateController 组件是 JobScheduler 的监听器,主要监听集群所有作业的提交、运行、完成情况,并从 BatchInfo 实例中获取以下信息,交给速率估算器(RateEstimator)做速率的估算。
  • 当前批次任务处理完成的时间戳 (processingEndTime)该批次从第一个 job 到最后一个 job 的实际处理时长 (processingDelay)该批次的调度时延,即从被提交到 JobScheduler 到第一个 job 开始处理的时长(schedulingDelay)该批次输入数据的总条数(numRecords)
  • RateEstimatorSpark 2.x 只支持基于 PID 的速率估算器,这里只讨论这种实现。基于 PID 的速率估算器简单地说就是它把收集到的数据(当前批次速率)和一个设定值(上一批次速率)进行比较,然后用它们之间的差计算新的输入值,估算出一个合适的用于下一批次的流量阈值。这里估算出来的值就是流量的阈值,用于更新每秒能够处理的最大记录数
  • RateLimiter以上这两个组件都是在Driver端用于更新最大速度的,而RateLimiter是用于接收到Driver的更新通知之后更新Executor的最大处理速率的组件。RateLimiter是一个抽象类,它并不是Spark本身实现的,而是借助了第三方Google的GuavaRateLimiter来产生的。它实质上是一个限流器,也可以叫做令牌,如果Executor中task每秒计算的速度大于该值则阻塞,如果小于该值则通过,将流数据加入缓存中进行计算。
反压机制真正起作用时需要至少处理一个批:由于反压机制需要根据当前批的速率,预估新批的速率,所以反压机制真正起作用前,应至少保证处理一个批。

* 如何保证反压机制真正起作用前应用不会崩溃:要保证反压机制真正起作用前应用不会崩溃,需要控制每个批次最大摄入速率。若为Direct Stream,如Kafka Direct Stream,则可以通过spark.streaming.kafka.maxRatePerPartition参数来控制。此参数代表了 每秒每个分区最大摄入的数据条数。假设BatchDuration为10秒,spark.streaming.kafka.maxRatePerPartition为12条,kafka topic 分区数为3个,则一个批(Batch)最大读取的数据条数为360条(3*12*10=360)。同时,需要注意,该参数也代表了整个应用生命周期中的最大速率,即使是背压调整的最大值也不会超过该参数。

5-Spark Streaming一个stage耗时

Spark Streaming stage耗时由最慢的task决定,所以数据倾斜时某个task运行慢会导致整个Spark Streaming都运行非常慢。

6-Spark Streaming优雅关闭

把spark.streaming.stopGracefullyOnShutdown参数设置成ture,Spark会在JVM关闭时正常关闭StreamingContext,而不是立马关闭

Kill 命令:yarn application -kill 后面跟 applicationid

7-Spark Streaming默认分区个数

Spark Streaming默认分区个数与所对接的kafka topic分区个数一致,Spark Streaming里一般不会使用repartition算子增大分区,因为repartition会进行shuffle增加耗时。

8-SparkStreaming有哪几种方式消费Kafka中的数据,它们之间的区别是什么?

一、基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

二、基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

优点如下:

简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

一次且仅一次的事务机制。

三、对比:

基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

在实际生产环境中大都用Direct方式

9-简述SparkStreaming窗口函数的原理(重点)

窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。

图中time1就是SparkStreaming计算批次大小,虚线框以及实线大框就是窗口的大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。

#秋招##大数据#
全部评论

相关推荐

暑期是进不了大厂了想问问前端友友们 ,后面应该如何沉淀自己,我想秋招再冲一下尤其是八股,应该抓哪一块是重点,理解到什么程度呢,要学到什么深度才能抗住拷打。还有场景题如何去准备。期待友友们的解答。
命烈焰带我飞走:找个中厂小厂先看看吧,去了熟悉熟悉项目,简历上扒点东西,之后刷刷sobb上百度美团快手的日常实习,流程都比较快轮次也少,别给自己太大压力,一步一步来,先不用想着暑期,转正,秋招那些事情,另外如果可能的话可以关注下面试时候的形象,穿搭,环境这些,其实实习主要就是看个眼缘,看着好看声音好听其实加分不少..八股这些不要死记硬背,挨个拿去问问chatgpt,这个东西做出来是为了解决什么问题,有啥效果,自己有想法有个模糊的概念就可以了,人家也知道你是学生,实习生没有什么kpi,放你去面都是希望能把你招进去的,场景题算法题没做过你可以边试着写边跟面试官说你的想法思路,也可以直说没见过让他们给你提示,反正最后都是与或非顺序分支循环存取值那套。总之建议是别为了秋招..出去旅旅游放松放松,少投几家少背八股多写写代码
点赞 评论 收藏
分享
从输入URL到页面加载发生了什么:总体来说分为以下几个过程: 1.DNS解析 2.TCP连接 3.发送HTTP请求 4.服务器处理请求并返回HTTP报文 5.浏览器解析渲染页面 6.连接结束。简述了一下各个过程的输入输出作用:以下是对从输入 URL 到页面加载各过程的输入、输出或作用的一句话描述:DNS 解析: 输入:用户在浏览器地址栏输入的域名(如 www.example.com)。输出:对应的 IP 地址(如 192.168.1.1)。作用:将易于记忆的域名转换为计算机能够识别和用于网络通信的 IP 地址,以便浏览器与目标服务器建立连接。TCP 连接: 输入:浏览器获得的服务器...
明天不下雨了:参考一下我的说法: 关键要讲出输入网址后涉及的每一个网络协议的工作原理和作用: 涉及到的网络协议: HTTP/HTTPS协议->DNS协议->TCP协议->IP协议->ARP协议 面试参考回答: 第一次访问(本地没有缓存时): 一般我们在浏览器地址栏输入的是一个域名。 浏览器会先解析 URL、解析出域名、资源路径、端口等信息、然后构造 HTTP 请求报文。浏览器新开一个网络线程发起HTTP请求(应用层) 接着进行域名解析、将域名解析为 IP 地址 浏览器会先检查本地缓存(包括浏览器 DNS 缓存、操作系统缓存等)是否已解析过该域名 如果没有、则向本地 DNS 服务器请求解析; 本地服务器查不到会向更上层的 DNS 服务器(根域名服务器->顶级域名服务器->权威域名服务器询问)递归查询 最终返回该域名对应的 IP 地址。(应用层DNS协议)DNS 协议的作用: 将域名转换为 IP 地址。 由于 HTTP 是基于 TCP 传输的、所以在发送 HTTP 请求前、需要进行三次握手、在客户端发送第一次握手的时候、( 浏览器向服务器发送一个SYN(同步)报文、其中包含客户端的初始序列号。TCP头部设置SYN标志位、并指定客户端端口 同时填上目标端口和源端口的信息。源端口是浏览器随机生成的、目标端口要看是 HTTP 还是 HTTPS、如果是 HTTP 默认目标端口是 80、如果是 HTTPS 默认是 443。(传输层) 然后到网络层:涉及到(IP协议) 会将TCP报文封装成IP数据包、添加IP头部,包含源IP地址(浏览器)和目标IP地址(服务器)。IP 协议的作用: 提供无连接的、不可靠的数据包传输服务。 然后到数据链路层、会通过 ARP 协议、获取目标的路由器的 MAC 地址、然后会加上 MAC 头、填上目标 MAC 地址和源 MAC 地址。 然后到物理层之后、直接把数据包、转发给路由器、路由器再通过下一跳、最终找到目标服务器、然后目标服务器收到客户的 SYN 报文后,会响应第二次握手。 当双方都完成三次握手后、如果是 HTTP 协议、客户端就会将 HTTP 请求就会发送给目标服务器。如果是 HTTPS 协议、客户端还要和服务端进行 TLS 四次握手之后、客户端才会将 HTTP 报文发送给目标服务器。 目标服务器收到 HTTP 请求消息后、就返回 HTTP 响应消息、浏览器会对响应消息进行解析渲染、呈现给用户
点赞 评论 收藏
分享
04-03 11:37
武汉大学 Java
高斯林的信徒:武大简历挂?我勒个骚岗
点赞 评论 收藏
分享
我是985研究生,最近学校在组织开题,大家都在非常紧张地准备,但我一直进入不了状态,很想做但是心又很浮躁。但我的室友们感觉都非常认真,每天醒来就开始看论文,睡着前最后一件事还是在看论文,我非常焦虑。我感觉自己甚至有点把大家当做假想敌了。这种比较心态还存在于生活的各种方面:看到有钱的同学会非常羡慕,看到朋友圈里面环游世界的留学生同学也会羡慕,看到那些工作后有自己的钱而过上较为阔绰的生活的时候还是羡慕,就仿佛只有自己一个人在阴暗爬行。而且这些比较是每时每刻的,为了不比较,我已经关闭了朋友圈,但是每次偶尔刷一下还是会难受很久。我知道比较是偷走幸福的小偷,但我好像控制不了,感觉自己是一个偷窥别人生活的...
若怜君欢:担心开题搞砸了,幻想拥有别人的生活,本质上是因为自卑,楼主小时候大概率是留守儿童或者父母关系很紧张,导致楼主没有安全感、焦虑、内耗。 这样的情况最好的办法就是建立自信和降低期待,建立自信不是一蹴而就,而是循序渐进,比如告诉自己允许自己第一次没把事情做好,失败了能搞清楚其中缘由而不是全盘否定自己,失败不是终点,放弃才是;降低期待只要记住一句话即可,能伴随你一生的,只有经验和学识,所以你对事情的态度应该更多地去思考它是否能带来学识和经验的增长,而不是仅仅用短期的利益作为唯一期待。 人生不是一成不变的,它是可以迭代更新的,去归纳总结自身的不足并结合实际去改进,去尝试一些新的思路和方法,不要固执钻牛角尖,也不要反复横跳,为自己设立一个高度聚集的精神内核,内核之上可以去尝试一切有利于自己更好的方式 以上就是我个人对生活的理解,共勉
点赞 评论 收藏
分享
评论
7
16
分享

创作者周榜

更多
牛客网
牛客企业服务