高并发技巧-流量聚合和高并发写入处理技巧

这篇文章我将介绍对于写入流量高的处理技巧,并且介绍一款快手开源的很实用的用来做流量聚合的工具BufferTrigger。通过这篇文章的介绍,相信对于大多写流量(后面就称为写qps吧)高的业务场景都能找到解决方案。

读流量(qps)高的场景其实见的更多,比如常用的淘宝和抖音,大部分都是读场景,而写的流量相对读的流量整体要低不少。但是如果遇到做活动,比如电商促销秒杀;或者一些本身确实写流量高的场景,比如直播(点赞,收礼)等有什么好的解决方案呢,下面通过个人遇到的一些业务场景来介绍实际用到的解决方案以及使用的场景和局限性。

消息队列

使用消息队列进行流量削峰来解决写qps高的问题应该是最常见的解决方案,将每次的请求扔到消息队列,后续由消费者慢慢处理即可,使用消息队列能够自己控制qps和线程数,所以就能够缓解下游的压力了。

使用场景介绍

这里就举两个常见的例子

电商秒杀:每次用户秒杀的结果虽然在页面展示了,实际上一般是把请求丢到了消息队列了,后续慢慢消费处理(db中库存的改动等);

直播间收礼:在一些大V开直播的时候,直播收礼的流量10W/s很正常,服务端需要对礼物进行归类和统计等处理,这显然也可以使用mq去异步处理;

注意事项

消息队列的处理异步处理的,所以对于需要及时响应处理结果的业务可能就不大适合了。当然,对于处理结果比较固定,比如就是返回成功(商城秒杀),然后交由消息队列处理并且保证处理成功的范畴,不在考虑范围。

设置mq的消费速度要评估好,保证不能打垮下游服务;同时对于不断产生消息的业务,也不能让消费速度赶不上生产速度,导致消息不断堆积,引爆队列,必要时要对下游进行适当的横向扩容。总之,要在性能和消费速度之间进行权衡,找到一个平衡点。

流量聚合-BufferTrigger

BufferTrigger引入

流量聚合简单来说就是把多次的请求整合为一个请求处理,显然只有在业务对单词的请求不敏感时并且能接受一定延迟时才能使用,比如直播间点赞,主播对单次点赞根本不敏感,直播间展示的赞数也不是实时的,完全可以对多次点赞进行聚合,最后再进行一些列判断再累计起来存放到db或缓存中,直播间从db或缓存中进行拉取。

那么这个聚合的工具需要什么功能呢?简单来说主要就三点

提供个能存放数据的容器

当达到指定数量时输出容器中的数据

当达到指定时候时输出容器中的数据

刚好快手提供了这么个工具-BufferTrigger,这个工具在快手内部大量使用,尤其是主站直播,下面将简单介绍下这个简单易用的工具。

使用场景

对大量的数据进行聚合,然后进行批量操作,适用于数据量大且相似或相同数据多的任务或者能接受一定时间内的延迟问题

如何使用

引入依赖

<dependency>
  <groupId>com.github.phantomthief</groupId>
  <artifactId>buffer-trigger</artifactId>
  <version>0.2.9</version>
</dependency>

使用方式1

public class BufferTriggerDemo {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long, Map<Long, AtomicInteger>> simple()
            .maxBufferCount(10)
            .interval(4, TimeUnit.SECONDS)
            .setContainer(ConcurrentHashMap::new, (map, uid) -> {
                map.computeIfAbsent(uid, key -> new AtomicInteger()).addAndGet(1);
                return true;
            })
            .consumer(this::consumer)
            .build();



    public void consumer(Map<Long, AtomicInteger> map) {
        System.out.println(map);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        // 最大容量是10,这里尝试添加11个元素0-10
        for (int i = 0; i < 5; i ++) {
            for (long j = 0; j < 11; j ++) {
                bufferTrigger.enqueue(j);
            }
        }

        Thread.sleep(7000);
    }


使用simple方法来进行构建

1.maxBuffeCount(long count): 指定容器最大容量,比如这里指定了10,当在下次聚合前容器元素数量达到10就无法添加了,-1表示无限制;

2.internal(long interval, TimeUnit unit) :表示多久聚合一次,如果没达到时间那么consumer是不会输出的,聚合后容器就空了。

3.setContainer(Supplier<? extends C> factory, BiPredicate<? super C, ? super E> queueAdder): 第一个变量为factory,是个Supplier,获取容器用的,要求线程安全;第二个变量是缓存更新的方法BiPredicate<? super C, ? super E> queueAdder C为容器类型,E为元素类型

4.consumer(ThrowableConsumer<? super C, Throwable> consumer): 表示如何消费聚合后的数据,标识我们如何去消费聚合后的数据,我这里就是简单打印。

5.enqueue(E element): 添加元素;

6.manuallyDoTrigger: 主动触发一次消费,通常在java进程关闭的时候调用

执行一遍,输入如下

{0=5, 1=5, 2=5, 3=5, 4=5, 5=5, 6=5, 7=5, 8=5, 9=5}

使用方式2

每次将元素原封不动保存下来,然后一次性消费一整个列表元素。而上面的方式,每次添加元素都会进行计算。

public class BufferTriggerDemo2 {
     BufferTrigger<Long> bufferTrigger = BufferTrigger.<Long>batchBlocking()
             .bufferSize(50)
             .batchSize(10)
             .linger(Duration.ofSeconds(1))
             .setConsumerEx(this::consume)
             .build();

    private void consume(List<Long> nums) {
        System.out.println(nums);
    }

    public void test() throws InterruptedException {
        // 进程退出时手动消费一次
        Runtime.getRuntime().addShutdownHook(new Thread(() -> bufferTrigger.manuallyDoTrigger()));
        for (long j = 0; j < 60; j ++) {
            bufferTrigger.enqueue(j);
        }

        Thread.sleep(7000);
    }

1.batchBlocking():提供自带背压(back-pressure)的简单批量归并消费能力;

2.bufferSize(int bufferSize): 缓存队列的最大容量;

3.batchSize(int size): 批处理元素的数量阈值,达到这个数量后也会进行消费

4.linger(Duration duration): 多久消费一次

5.setConsumerEx(ThrowableConsumer<? super List, Exception> consumer): 消费函数,注入的对象为缓存队列中尚存的所有元素,非逐个元素消费;

执行一遍,输入如下

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[40, 41, 42, 43, 44, 45, 46, 47, 48, 49]
[50, 51, 52, 53, 54, 55, 56, 57, 58, 59]

full gc陷阱和消费能力提速

需要注意的是BufferTrigger是单线程消费的,这个是一个比较大的陷阱,做活动的时候是踩了坑的,尤其是在消费操作中涉及到大量io操作的场景,因为在流量很高的时候可能会出现消费速度跟不上生产速度,这很容易导致full gc问题。所以如果有必要的话需要使用线程池来提升消费速度。

消息队列和BufferTrigger的组合方式

有没有考虑过消息队列的消费速度过慢,如何在不影响下游性能的情况下提升消费速?比如直播点赞,在mq每次收到一条点赞消息的时候是不是就可以使用BufferTrigger来进行聚合?然后每分钟消费一次,在流量剧增的时候是不是能十倍以上的提速?

散库散表

从框架层面我介绍了上面两种解决方案,其实使用这两种方案主要是为了缓解数据库压力,尤其是单表的情况。但是高流量一般涉及到的都是用户维度的流量,所以如果必要的话可以根据userId来进行分库分表(我一般习惯就叫散表)并且优化表的设计,需要注意的是,最好不要分了表而不分库,因为多表共用一个资源性能提升还是不大。比如我们内部分表都是1000张,库都是至少10个库(集群)。这样处理一般的请求上万的tps是完全没问题的。

下面将从业务设计(技巧)的层面来介绍如何处理写qps过高的问题。

请求丢弃

在很多业务场景下,丢弃部分请求完全不影响业务,给个合理的提示的话用户根本无法感知,这其中最常见的就是秒杀、抢红包、发弹幕这类业务,请求量很大,但只有少部分请求能拿到钱,大部分请求直接丢弃都是没问题的,到时候告诉用户没抢到或者抢没了或者弹幕发生成功即可。

预处理

这类场景在抢红包的时候经常用到,可能很多人都知道很多app抢红包其实并不是在用户每次去抢的时候再去计算金额并落库的,一般是在红包发出去的时候就计算好了,比如1W块钱,100个人,那么就会随机生成100个数额推入到缓存的队列,用户抢的时候从队列直接pop即可。

当然,可以通过消息队列异步落库,也就是同步写缓存,异步写db。

又或者是在用户完成活动的时候得到一个资格,然后在指定的时间点去瓜分1W块钱,其实也是在这时间点之前就把能用的金额和获得资格的人拿出来,计算出每个人的金额再存储到缓存和db,最后用户虽然说是去抢,其实就是从缓存中把已经给他存好的钱拿出来了而已。

流量打散

这种方案可能用到的机会不是太多,组里有老人做过所以拿来借鉴。其实这种手段在活动尤其春节活动的时候常见,特点是先互动后开奖(即在业务能接受一定的延迟情况)。快手的20年春节活动就是这样,先播一段春节明星拜年的视频,视频播放完之后弹出抢红包按钮。在这一瞬间流量肯定是非常高的,如果在这时候再去算每个人的钱对服务性能要求就太高了,这是没必要去做的挑战。因为在视频播放的时候我们就可以计算出来金额。

具体的做法是,服务端给客户端返回一个打散最大时间,比如30S,那么客户端就生成随机生成一个(0,30)的时间t,并在过now+t的时候去请求服务端拿到用户的金额,用户点击抢红包的时候就能直接告诉他这个金额。

#java后端#
全部评论

相关推荐

05-10 17:11
门头沟学院 Java
秋招过去了好久,是时候更新一下面经了一面-&nbsp;拷打实习项目-&nbsp;实习项目亮点-&nbsp;拷打项目(折磨)-&nbsp;为什么要用两级缓存-&nbsp;caffine淘汰策略(没看过)-&nbsp;为什么本地用top50,我说是top30行不行,(预估,预热)-&nbsp;如果千万级是什么方案-&nbsp;为什么要牺牲一致性(CAP,&nbsp;BASE扯了下)-&nbsp;1000w用户需要怎么做-&nbsp;定时器放在那里-&nbsp;怎么做数据预热-&nbsp;这里battle了巨久,感觉没有回答想要的点-&nbsp;springboot启动流程-&nbsp;java&nbsp;bean是什么(这里我说get&nbsp;set方法,他说应该从IOC里面说)-&nbsp;IOC是什么-&nbsp;IOC有什么好处(说了解耦,他问还有呢,从使用者和组件开发者的角度,我是真不会啊)-&nbsp;又扯了巨久,真不会回答-&nbsp;手撕:验证搜索二叉树二面-&nbsp;项目拷打吧20min,其实感觉也没讲明白-&nbsp;raft协议-&nbsp;raft能应对脑裂吗-&nbsp;ES原理-&nbsp;有实际运维部署经验吗)无-&nbsp;时间久远其他问题记不得了-&nbsp;手撕:交叉链表三面-&nbsp;拷打项目-&nbsp;说说SQL的执行的整个流程-&nbsp;为什么要用逻辑执行计划-&nbsp;你知道MySQL优化器会优化那些内容吗-&nbsp;innodb引擎索引结构-&nbsp;二级索引结构-&nbsp;b+树和b树有什么优势-&nbsp;为什么二级索引叶子节点要放主键值而不是一个指针)说的页分裂不知道对不对-&nbsp;知道最左匹配原则吗-&nbsp;undo&nbsp;log,&nbsp;redo&nbsp;log,&nbsp;bin&nbsp;log都说说-&nbsp;redo&nbsp;log写到内存里如何保证能刷盘(3个参数)-&nbsp;事务两阶段提交的过程-&nbsp;MVCC实现的原理-&nbsp;进程和线程的区别-&nbsp;用户态和内核态的区别-&nbsp;怎么从用户态切换到内核态-&nbsp;在编程的时候如何减少用户态到内核态的切换)这里纯在乱答-&nbsp;协程有了解吗-&nbsp;说下多路IO复用-&nbsp;讲下4次挥手)捏马的有点忘了状态名字了,说了两遍才说懂-&nbsp;为什么time_wait是2MSL为什么不是1MSL,为什么不是3MSL-&nbsp;fork知道吗,fork返回的值是什么-&nbsp;a&nbsp;=&nbsp;fork()&nbsp;b=fork()&nbsp;print(a,b)&nbsp;这个最后产生几个进程,打印的内容是什么-&nbsp;了解哪些排序-&nbsp;快排复杂度推导一下-&nbsp;归并的复杂度推导一下-&nbsp;链表做归并的时候需要从中间节点断开,这个相比归并数组会影响时间复杂度吗-&nbsp;LRU思路讲下-&nbsp;手撕:链表排序-&nbsp;一共一个半小时,强度有点大,有些推导性质的东西确实不记得了,只记得结论了。还得下来多看下
点赞 评论 收藏
分享
05-25 22:01
已编辑
东北大学 Java
字节搜索二面挂当天被捞1、自我介绍2、你提到了用户的关注与取关,你用户关系服务是怎么设计的?(定义了关注表与粉丝表,两个表内容一致)3、你怎么保证两个表内容一致的?(目前是通过事务保证的,后面其实还可以通过订阅&nbsp;binlog&nbsp;伪从来保证一致性)3、如果是大&nbsp;V&nbsp;的情况,你有考虑到吗,做了哪些处理应对这种高并发(Redis&nbsp;缓存+二级缓存,冷热数据分离)4、分布式&nbsp;ID&nbsp;你都用来生成什么&nbsp;ID&nbsp;的?(笔记&nbsp;ID,用户&nbsp;ID,用户&nbsp;ID&nbsp;用的号段模式,笔记&nbsp;ID&nbsp;考虑到雪花算法自带的时间戳可以实现冷热数据分离,发布久远的笔记不缓存在&nbsp;redis,后由于点赞系统采用咆哮位图高效判断,但咆哮位图基本只能存储&nbsp;32&nbsp;位,遂也改为号段模式生成,生成效率基本没差多少)5、那你说说点赞系统怎么设计的?为什么改为咆哮位图了?(先是采用&nbsp;Set&nbsp;数据结构判断,后因为满足高并发需求,Set&nbsp;模式占用内存太多,又改用布隆过滤器实现,大大降低内存占用。但布隆过滤器在判断存在时存在误判,需要从数据库进行二次校验。后改用咆哮位图,既能高效判断点赞与否,内存占用也大大降低)6、那你讲一下咆哮位图的机制,为什么有你说的这些优点?7、MySQL&nbsp;了解吧,你讲一下&nbsp;MySQL&nbsp;的索引(一顿吟唱)8、说一下聚簇索引和非聚簇索引的区别9、联合索引再说一下,如何定义联合索引最好?(设计成覆盖索引)10、联合索引的顺序重要吗?(顺便再说一下索引下推)11、算法1:二叉树展开为链表12、算法2:根据层序遍历建树反问
字节跳动一面1190人在聊 查看13道真题和解析
点赞 评论 收藏
分享
评论
3
5
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务