Java并发看这篇就够了

0 联系我

1.Q群【Java开发技术交流】:https://jq.qq.com/?_wv=1027&k=5UB4P1T
2.简书博客:www.shishusheng.com
3.知乎:http://www.zhihu.com/people/shi-shu-sheng-
4.微信公众号:JavaEdge
5.Github:https://github.com/Wasabi1234

1 相关代码,请见/下载于

https://github.com/Wasabi1234/concurrency

高并发处理的思路及手段

1 基本概念

1.1 并发

同时拥有两个或者多个线程,如果程序在单核处理器上运行多个线程将交替地换入或者换出内存,这些线程是同时“存在"的,每个线程都处于执行过程中的某个状态,如果运行在多核处理器上,此时,程序中的每个线程都将分配到一个处理器核上,因此可以同时运行.

1.2 高并发( High Concurrency)

互联网分布式系统架构设计中必须考虑的因素之一,通常是指,通过设计保证系统能够同时并行处理很多请求.

1.3 区别与联系

  • 并发: 多个线程操作相同的资源,保证线程安全,合理使用资源

  • 高并发:服务能同时处理很多请求,提高程序性能

2 CPU

2.1 CPU 多级缓存

  • 为什么需要CPU ***
    CPU的频率太快了,快到主存跟不上
    如此,在处理器时钟周期内,CPU常常需要等待主存,浪费资源。所以***的出现,是为了缓解CPU和内存之间速度的不匹配问题(结构:cpu-> ***-> memory ).

  • CPU ***的意义
    1) 时间局部性
    如果某个数据被访问,那么在不久的将来它很可能被再次访问
    2) 空间局部性
    如果某个数据被访问,那么与它相邻的数据很快也可能被访问

    2.2 缓存一致性(MESI)

    用于保证多个 CPU *** 之间缓存共享数据的一致

  • M-modified被修改
    该缓存行只被缓存在该 CPU 的缓存中,并且是被修改过的,与主存中数据是不一致的,需在未来某个时间点写回主存,该时间是允许在其他CPU 读取主存中相应的内存之前,当这里的值被写入主存之后,该缓存行状态变为 E

  • E-exclusive独享
    缓存行只被缓存在该 CPU 的缓存中,未被修改过,与主存中数据一致
    可在任何时刻当被其他 CPU读取该内存时变成 S 态,被修改时变为 M态

  • S-shared共享
    该缓存行可被多个 CPU 缓存,与主存中数据一致

  • I-invalid无效

  • 乱序执行优化
    处理器为提高运算速度而做出违背代码原有顺序的优化

并发的优势与风险

![](https://upload-images.jianshu.io/upload_images/4685968-e083e9bf164b7d73.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

3 项目准备

3.1 项目初始化

![自定义4个基本注解](https://upload-images.jianshu.io/upload_images/4685968-9ea512f5c1b3b4ea.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![随手写个测试类](https://upload-images.jianshu.io/upload_images/4685968-7bc23c076be35936.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![运行正常](https://upload-images.jianshu.io/upload_images/4685968-c7fba914bceb792e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

3.2 并发模拟-Jmeter压测

![](https://upload-images.jianshu.io/upload_images/4685968-8c994437f5663dd6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](https://upload-images.jianshu.io/upload_images/4685968-0a6f7c5e0217aa17.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](https://upload-images.jianshu.io/upload_images/4685968-646e66ad4e2c7ffc.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![log view 下当前日志信息](https://upload-images.jianshu.io/upload_images/4685968-cc70555c9f3b78b2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![图形结果](https://upload-images.jianshu.io/upload_images/4685968-01c21bb9069bcd28.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

3.3 并发模拟-代码

CountDownLatch

![可阻塞线程,并保证当满足特定条件时可继续执行](https://upload-images.jianshu.io/upload_images/4685968-0d0481ec5302cadb.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

Semaphore(信号量)

![可阻塞线程,控制同一时间段内的并发量](https://upload-images.jianshu.io/upload_images/4685968-3133af3b8e419a39.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
以上二者通常和线程池搭配

下面开始做并发模拟

package com.mmall.concurrency;
import com.mmall.concurrency.annoations.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author shishusheng
 * @date 18/4/1
 */
@Slf4j
@NotThreadSafe
public class ConcurrencyTest {
    /**
     * 请求总数
     */
    public static int clientTotal = 5000;
    /**
     * 同时并发执行的线程数
     */
    public static int threadTotal = 200;
    public static int count = 0;
    public static void main(String[] args) throws Exception {
        //定义线程池
        ExecutorService executorService = Executors.newCachedThreadPool();
        //定义信号量,给出允许并发的线程数目
        final Semaphore semaphore = new Semaphore(threadTotal);
        //统计计数结果
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        //将请求放入线程池
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    //信号量的获取
                    semaphore.acquire();
                    add();
                    //释放
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        //关闭线程池
        executorService.shutdown();
        log.info("count:{}", count);
    }
    /**
     * 统计方法
     */
    private static void add() {
        count++;
    }
}

运行发现结果随机,所以非线程安全

4线程安全性

4.1 线程安全性

当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的

4.2 原子性

4.2.1 Atomic 包

  • AtomicXXX:CAS,Unsafe.compareAndSwapInt
    提供了互斥访问,同一时刻只能有一个线程来对它进行操作
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
/**
 * @author shishusheng
 */
@Slf4j
@ThreadSafe
public class AtomicExample2 {
    /**
     * 请求总数
     */
    public static int clientTotal = 5000;
    /**
     * 同时并发执行的线程数
     */
    public static int threadTotal = 200;
    /**
     * 工作内存
     */
    public static AtomicLong count = new AtomicLong(0);
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    System.out.println();
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        //主内存
        log.info("count:{}", count.get());
    }
    private static void add() {
        count.incrementAndGet();
        // count.getAndIncrement();
    }
}
package com.mmall.concurrency.example.atomic;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.atomic.AtomicReference;
/**
 * @author shishusheng
 * @date 18/4/3
 */
@Slf4j
@ThreadSafe
public class AtomicExample4 {
    private static AtomicReference count = new AtomicReference<>(0);
    public static void main(String[] args) {
        // 2
        count.compareAndSet(0, 2);
        // no
        count.compareAndSet(0, 1);
        // no
        count.compareAndSet(1, 3);
        // 4
        count.compareAndSet(2, 4);
        // no
        count.compareAndSet(3, 5); 
        log.info("count:{}", count.get());
    }
}

输出结果

  • AtomicReference,AtomicReferenceFieldUpdater

  • AtomicBoolean

  • AtomicStampReference : CAS的 ABA 问题

4.2.2 锁

synchronized:依赖 JVM
  • 修饰代码块:大括号括起来的代码,作用于调用的对象

  • 修饰方法: 整个方法,作用于调用的对象

  • 修饰静态方法:整个静态方法,作用于所有对象

package com.mmall.concurrency.example.count;
import com.mmall.concurrency.annoations.ThreadSafe;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
 * @author shishusheng
 */
@Slf4j
@ThreadSafe
public class CountExample3 {
    /**
     * 请求总数
     */
    public static int clientTotal = 5000;
    /**
     * 同时并发执行的线程数
     */
    public static int threadTotal = 200;
    public static int count = 0;
    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(threadTotal);
        final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
        for (int i = 0; i < clientTotal ; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        log.info("count:{}", count);
    }
    private synchronized static void add() {
        count++;
    }
}

synchronized 修正计数类方法

  • 修饰类:括号括起来的部分,作用于所有对象
    子类继承父类的被 synchronized 修饰方法时,是没有 synchronized 修饰的!!!

Lock: 依赖特殊的 CPU 指令,代码实现

4.2.3 对比

  • synchronized: 不可中断锁,适合竞争不激烈,可读性好

  • Lock: 可中断锁,多样化同步,竞争激烈时能维持常态

  • Atomic: 竞争激烈时能维持常态,比Lock性能好; 只能同步一
    个值

4.3 可见性

一个线程对主内存的修改可以及时的被其他线程观察到

###4.3.1 导致共享变量在线程间不可见的原因

  • 线程交叉执行

  • 重排序结合线程交叉执行

  • 共享变量更新后的值没有在工作内存与主存间及时更新

4.3.2 可见性之synchronized

JMM关于synchronized的规定

  • 线程解锁前,必须把共享变量的最新值刷新到主内存

  • 线程加锁时,将清空工作内存***享变量的值,从而使
    用共享变量时需要从主内存中重新读取最新的值(加锁与解锁是同一把锁)

4.3.3 可见性之volatile

通过加入内存屏障和禁止重排序优化来实现
  • 对volatile变量写操作时,会在写操作后加入一条store
    屏障指令,将本地内存中的共享变量值刷新到主内存

  • 对volatile变量读操作时,会在读操作前加入一条load
    屏障指令,从主内存中读取共享变量
    volatile 写
    volatile 读
    计数类之 volatile 版,非线程安全的

  • volatile使用

volatile boolean inited = false;
//线程1:
context = loadContext();
inited= true;
// 线程2:
while( !inited ){
    sleep();
}
doSomethingWithConfig(context)

4.4 有序性

一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序

JMM允许编译器和处理器对指令进行重排序,但是重排序过程不会影响到单线程程序的执行,却会影响到多线程并发执行的正确性

4.4.1 happens-before 规则

5发布对象


发布对象
对象逸出

5.1 安全发布对象


非线程安全的懒汉模式
饿汉模式
线程安全的懒汉模式

package com.mmall.concurrency.example.singleton;
import com.mmall.concurrency.annoations.NotThreadSafe;
/**
 * 懒汉模式 -》 双重同步锁单例模式
 * 单例实例在第一次使用时进行创建
 * @author shishusheng
 */
@NotThreadSafe
public class SingletonExample4 {
    /**
     * 私有构造函数
     */
    private SingletonExample4() {
    }
    // 1、memory = allocate() 分配对象的内存空间
    // 2、ctorInstance() 初始化对象
    // 3、instance = memory 设置instance指向刚分配的内存
    // JVM和cpu优化,发生了指令重排
    // 1、memory = allocate() 分配对象的内存空间
    // 3、instance = memory 设置instance指向刚分配的内存
    // 2、ctorInstance() 初始化对象
    /**
     * 单例对象
     */
    private static SingletonExample4 instance = null;
    /**
     * 静态的工厂方法
     *
     * [@return](/profile/547241) */
    public static SingletonExample4 getInstance() {
        // 双重检测机制 // B
        if (instance == null) {        
            // 同步锁
            synchronized (SingletonExample4.class) { 
                if (instance == null) {
                    // A - 3
                    instance = new SingletonExample4(); 
                }
            }
        }
        return instance;
    }
}


7 AQS

7.1 介绍

数据结构

  • 使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架
  • 利用了一个int类型表示状态
  • 使用方法是继承
  • 子类通过继承并通过实现它的方法管理其状态{acquire 和release} 的方法操纵状态
  • 可以同时实现排它锁和共享锁模式(独占、共享)

同步组件

CountDownLatch

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

*   @author shishusheng

*   /
    @Slf4j
    public class CountDownLatchExample1 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

    ExecutorService exec = Executors.newCachedThreadPool();
      final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
      for (int i = 0; i < threadCount; i++) {
          final int threadNum = i;
          exec.execute(() -> {
              try {
                  test(threadNum);
              } catch (Exception e) {
                  log.error("exception", e);
              } finally {
                  countDownLatch.countDown();
              }
          });
      }
      countDownLatch.await();
      log.info("finish");
      exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
    Thread.sleep(100);
      log.info("{}", threadNum);
      Thread.sleep(100);


    }
}
package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**

*   指定时间内处理任务

*   @author shishusheng

*   /
    @Slf4j
    public class CountDownLatchExample2 {

    private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

    ExecutorService exec = Executors.newCachedThreadPool();
      final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
      for (int i = 0; i < threadCount; i++) {
          final int threadNum = i;
          exec.execute(() -> {
              try {
                  test(threadNum);
              } catch (Exception e) {
                  log.error("exception", e);
              } finally {
                  countDownLatch.countDown();
              }
          });
      }
      countDownLatch.await(10, TimeUnit.MILLISECONDS);
      log.info("finish");
      exec.shutdown();

    }

    private static void test(int threadNum) throws Exception {

    Thread.sleep(100);
      log.info("{}", threadNum);

    }
}

Semaphore用法

![](https://upload-images.jianshu.io/upload_images/4685968-e6cbcd4254c642c5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](https://upload-images.jianshu.io/upload_images/4685968-dbefbf2c76ad5a2a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
![](https://upload-images.jianshu.io/upload_images/4685968-41f5f5a5fd135804.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

CycliBarrier

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**

*   [@author shishusheng](/profile/5)

*   [/
    @Slf4j
    public class CyclicBarrierExample1 {](/profile/5)

    [private static CyclicBarrier barrier = new CyclicBarrier](/profile/5);

    public static void main(String[] args) throws Exception {

    ExecutorService executor = Executors.newCachedThreadPool();
      for (int i = 0; i < 10; i++) {
          final int threadNum = i;
          Thread.sleep(1000);
          executor.execute(() -> {
              try {
                  race(threadNum);
              } catch (Exception e) {
                  log.error("exception", e);
              }
          });
      }
      executor.shutdown();


    }

    private static void race(int threadNum) throws Exception {


    Thread.sleep(1000);
      log.info("{} is ready", threadNum);
      barrier.await();
      log.info("{} continue", threadNum);


    }
}

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**

*   [@author shishusheng](/profile/5)

*   [/
    @Slf4j
    public class CyclicBarrierExample2 {](/profile/5)

    [private static CyclicBarrier barrier = new CyclicBarrier](/profile/5);

    public static void main(String[] args) throws Exception {

    ExecutorService executor = Executors.newCachedThreadPool();
      for (int i = 0; i < 10; i++) {
          final int threadNum = i;
          Thread.sleep(1000);
          executor.execute(() -> {
              try {
                  race(threadNum);
              } catch (Exception e) {
                  log.error("exception", e);
              }
          });
      }
      executor.shutdown();


    }

    private static void race(int threadNum) throws Exception {


    Thread.sleep(1000);
      log.info("{} is ready", threadNum);
      try {
          barrier.await(2000, TimeUnit.MILLISECONDS);
      } catch (Exception e) {
          log.warn("BarrierException", e);
      }
      log.info("{} continue", threadNum);


    }
}

await 超时导致程序抛异常

package com.mmall.concurrency.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**

*   @author shishusheng

*   /
    @Slf4j
    public class SemaphoreExample3 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

    ```
    ExecutorService exec = Executors.newCachedThreadPool();
      final Semaphore semaphore = new Semaphore(3);
      for (int i = 0; i < threadCount; i++) {
          final int threadNum = i;
          exec.execute(() -> {
              try {
                  // 尝试获取一个许可
                  if (semaphore.tryAcquire()) {
                      test(threadNum);
                      // 释放一个许可
                      semaphore.release();
                  }
              } catch (Exception e) {
                  log.error("exception", e);
              }
          });
      }
      exec.shutdown();


    }

    private static void test(int threadNum) throws Exception {


    log.info("{}", threadNum);
      Thread.sleep(1000);


    }

}

9 线程池

9.1 newCachedThreadPool

9.2 newFixedThreadPool

9.3 newSingleThreadExecutor

看出是顺序执行的

9.4 newScheduledThreadPool


10 死锁


#Java#
全部评论
为我的java并发学习给出了大纲。感谢
点赞 回复
分享
发布于 2019-01-09 15:27

相关推荐

点赞 评论 收藏
转发
5 47 评论
分享
牛客网
牛客企业服务