多线程——生产者消费者问题、线程池

例子见Github-JavaSE-Day07

线程的生产者和消费者

  1. 生产产品,将产品放置到共享空间中

  2. 消费者从共享空间中取走产品

  3. 如果不加同步,多线程访问的时候,出现了数据安全的问题:
    1) 生产者没有生产产品,消费者就可以获取
    解决:
    在Goods类中设置一个flag标志,代表共享空间中有无商品
    当生产者抢占到cpu资源,会判断当前对象是否有值:
     如果有意味着有商品,需要提醒消费者消费,同时当前线程进入阻塞状态,等待消费者取走商品之后再次生产;如果没有,不用进入阻塞状态,直接生产即可.
     如果flag等于false,意味着生产者没有生产商品,此时消费者无法消费,需要让消费者线程进入阻塞状态,等待生产者生产。当有商品之后再开始消费.
    用wait()和notify()实现
    2) “旺仔—矿泉水”出现品牌名称不匹配情况(因为不是原子操作)
    解决:将Consumer类和Producer类中的生产、取走产品操作包装成同步方法放到Goods中,而在Consumer类和Producer类中直接调用Goods中的get()set(),这样保证了原子性

    现在多使用JUC(Java Util Concurrent)来完成

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // Test.java
    public class Test {
    public static void main(String[] args) {
    BlockingQueue<Goods> blockingQueue = new ArrayBlockingQueue<>(5);
    ProducerQueue producerQueue = new ProducerQueue(blockingQueue);
    ConsumerQueue consumerQueue = new ConsumerQueue(blockingQueue);
    new Thread(producerQueue).start();
    new Thread(consumerQueue).start();
    }
    }

Java线程池

为什么需要线程池?
线程很占系统资源,如果对线程管理不善,很容易导致系统问题。在大多时并发框架中都会使用线程池来管理线程。
好处

  1. 可以重复利用已有的线程继续执行任务,避免线程创建和销毁时造成的损耗,可以提高系统响应速度
  2. 可以对线程合理管理,根据系统的承受能力调整可运行线程的数量、大小等

注:

  1. 需要一个阻塞队列来存放我们的对应任务列表。
  2. 线程池是否已满和核心线程池是否已满的区别:后者是指我允许创建的最大线程个数是否满了。
  3. 如果线程池已满,后面进来的线程怎么处理?那就按照饱和策略(拒绝策略)处理。
    1)放到队列里
    2)指定哪些执行哪些不执行

线程池分类

在这里插入图片描述

ThreadPoolExecutor

不论是public static ExecutorService newSingleThreadExecutor(),还是public static ExecutorService newFixedThreadExecutor()public static ExecutorService newCacheThreadExecutor(),里面核心都是 new ThreadPoolExecutor

newCacheThreadPool

创建一个可根据需要创建新线程的线程池,以前构造的线程可用时会重用它们,并在需要时使用提供的ThreadFactory创建新线程。
特征

  1. 线程池中数量不固定,可达到最大值(Integer.MAX_VALUE)
  2. 线程池中的线程可进行缓存重复利用回收
  3. 当线程池中无可用线程,会创建一个新的

例子

Task.java
public class Task implements Runnable {
    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " running");
    }
}
CacheThreadPoolDemo.java
/**
 * Executor是所有要执行线程的一个父类,是一个接口
 * ExecutorService是继承于Executor的子接口
 */
public class CacheThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // ExecutorService从Executor 继承来的方法
        // Task.java是一个继承了Runnable接口的类
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Task());
        }
        // 启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务
        executorService.shutdown();
    }
}
运行结果
pool-1-thread-3 running
pool-1-thread-4 running
pool-1-thread-2 running
pool-1-thread-5 running
pool-1-thread-15 running
pool-1-thread-1 running
pool-1-thread-16 running
pool-1-thread-14 running
pool-1-thread-9 running
pool-1-thread-12 running
pool-1-thread-10 running
pool-1-thread-13 running
pool-1-thread-11 running
pool-1-thread-8 running
pool-1-thread-6 running
pool-1-thread-7 running
pool-1-thread-20 running
pool-1-thread-19 running
pool-1-thread-18 running
pool-1-thread-17 running

结果:如果Task.java中加了sleep(),那么运行结果中就就不会出现线程的复用。

newFixedThreadPool

特征

  1. 线程池中线程数量可以指定
  2. 线程可以重复被利用,在显示关闭之前,都将一直存在
  3. 超出一定量的线程被提交时需要在队列中等待

例子

FixedThreadPoolDemo.java
public class FixedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Task());
        }
        executorService.shutdown();
    }
}
运行结果
pool-1-thread-4 running
pool-1-thread-5 running
pool-1-thread-1 running
pool-1-thread-3 running
pool-1-thread-2 running
pool-1-thread-3 running
pool-1-thread-2 running
pool-1-thread-5 running
pool-1-thread-1 running
pool-1-thread-4 running
pool-1-thread-4 running
pool-1-thread-3 running
pool-1-thread-1 running
pool-1-thread-2 running
pool-1-thread-5 running
pool-1-thread-1 running
pool-1-thread-3 running
pool-1-thread-2 running
pool-1-thread-4 running
pool-1-thread-5 running

结果:如果Task.java中加了sleep(),可以明显看出,运行时是每五个为一组显示的。因为线程池中只有5个线程。

newSingleThreadPool

特征

  1. 线程池中数量为1,之后提交的线程将排在队列中依次执行

例子

SingleThreadPoolDemo.java
public class FixedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadPool(5);
        for (int i = 0; i < 20; i++) {
            executorService.execute(new Task());
        }
        executorService.shutdown();
    }
}
运行结果
pool-1-thread-1 running
pool-1-thread-1 running
……
ScheduledThreadPoolExecutor
newsingleThreadScheduledExecutor

创建一个单线程池执行程序,它可以安排在给定延迟后运行命令或者定期执行。
特征

  1. 线程池中最多执行1个线程,之后提交的线程活动将会排在队列中依次执行
  2. 可以延迟执行,也可以定期执行
newScheduledThreadPool

创建一个线程池,它可以安排在给定延迟后运行命令或者定期执行。
特征

  1. 线程池中线程数量可指定
  2. 可以延迟执行,也可以定期执行

例子

ScheduledThreadPoolDemo.java
public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
        System.out.println(System.currentTimeMillis());
        scheduledExecutorService.schedule(new Runnable() {
            @Override
            public void run() {
                System.out.println("延迟3s执行");
                System.out.println(System.currentTimeMillis());
            }
        }, 3, TimeUnit.SECONDS);
        scheduledExecutorService.shutdown();
    }
ScheduledThreadPoolDemo.java
public static void main(String[] args) {
    ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
    System.out.println(System.currentTimeMillis());
    scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            System.out.println("1----延迟1s执行,每3s执行一次");
            System.out.println(System.currentTimeMillis());
        }
    }, 1, 3,TimeUnit.SECONDS);
}
ForkJoinPool

分而治之:把一个任务差分成n多个任务,最终再汇总到一起。

newWorkStealingPool

创建一个带并行级别的线程池,并行级别决定同一时刻有多少个线程在执行。不传参默认为当前系统的CPU个数
特征

  1. 可以设置并行级别parallelism,即默认每时刻只有parallelism个线程同时执行
  2. 多CPU充分利用每个CPU的资源,多线程同时并行来进行执行,所以比较耗费电脑中的CPU

线程池的生命周期

注意与线程的生命周期区分!!!
在这里插入图片描述

  1. 运行状态:RUNNING 能接受新任务,也能处理阻塞队列中的任务。
  2. 终止状态:TERMINATED 在terminated()执行完后进入该状态,默认terminated()中什么也没做。

从运行到终止状态会有三种状态的过渡:

  1. SHUTDOWN:调用shutdown()切换到SHUTDOWN状态。不能再接受新任务,但能继续处理阻塞队列中保存的任务。
  2. STOP:调用shutdownnow()切换到STOP状态。不能再接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
  3. TIDYING:最终会进行一个回收工作。
    SHUTDOWN状态会将当前执行的任务执行完,阻塞队列为空,线程池中工作线程为0,进入TIDYING状态;STOP状态直接将任务都干掉,线程池中工作线程数量为0,进入TIDYING。
    如果所有任务都已终止,workerCount(有效线程数)为0,线程池进入该状态后会调用terminated()进入TERMINATED状态。

shutdown()shutdownnow()好。shutdownnow()比较暴力。

线程池的创建

不论是public static ExecutorService newSingleThreadExecutor(),还是public static ExecutorService newFixedThreadExecutor()public static ExecutorService newCacheThreadExecutor(),里面核心都是 new ThreadPoolExecutor
在这里插入图片描述
ThreadPoolExecutor中有多个构造方法,但无论是哪个,最终都是调用了7个参数的构造方法。
掌握各个参数的含义!

  1. int corePoolSize: 核心线程池数 当前线程池可以容纳的一个标准值。
  2. int maximumPoolSize:最大线程池数 如果大于核心线程数,小于最大线程池数,就可以接着new Thread来处理我们的任务。
  3. long keepAliveTime:存活时间 表示对应的存活时间,多长时间把没用的线程销毁
  4. TimeUnit unit :存活时间单位
  5. BlockingQueue workQueue:阻塞队列 任务处理不过来,暂时把任务放到阻塞队列中,等线程有空闲再来处理
  6. ThreadFactory threadFactory:工厂模式 里面是创建我们对应的一个线程的 这个代码一般我们不会自己写
  7. RejectedExecutionHandler handler拒绝策略 超过了最大线程池数,采取的策略

公交车座位17个corePoolSize),但是包括站位最多可以有37个(maximumPoolSize)。
上班高峰期,所以在车上额外临时加了10个座位,那么此时公交车里面可以坐27人了。而过了高峰期之后,甚至17个都坐不满了。这时候可以撤掉这10个座位,而到撤掉的这段时间限制就是keepAliveTime,时间单位是TimeUnit unit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

阻塞队列

ArrayBlockingQueue

  1. 基于数组实现的阻塞队列。ArrayBlockingQueu内部维护了一个数据缓冲定长数组
  2. 如果应用在生产者和消费者这样的应用场景中,它并不能实现两者真正意义上的并行执行。因为两者共用一个锁对象

LinkedBlockingQueue

  1. 基于链表实现的阻塞队列。内部维护了一个数据缓冲队列(链表实现)。
  2. 只有当队列缓冲区达到最大缓存容量时候,才会阻塞生产者队列,直到消费者消费掉一份数据,生产者线程才会被唤醒。
  3. LinkedBlockingQueue之所以能高效处理并发数据,也是因为其对生产者端和消费者端分别采用了独立的锁来控制数据同步,这意味着在高并发情况下,两者可以并行操作数据,以此提高整个队列的并发性能。

DelayBlockingQueue

  1. DelayBlockingQueue中的元素只有当其指定的延迟时间到了,才能获取该元素。
  2. 没有大小限制的队列,因此生产过程永远不会被阻塞,而只有消费才会被阻塞。
  3. 使用场景较少,但都很精妙。常见如使用它来管理一个超时未响应的连接队列。

PriorityBlockingQueue

  1. 基于优先级的阻塞队列。
  2. PriorityBlockingQueue不会阻塞生产者,只会阻塞消费者。所以要注意生产速度不能快于消费者,否则时间一长,会最终耗尽所有可用的堆内存空间。
  3. 内部控制线程同步的锁采用的是公平锁。

SynchronousQueue

  1. 一种无缓冲的等待队列,不可以暂存数据。类似于集市上无中介的直接交易。相对于有缓冲的BlockingQueue,少了一个中间商环节。
  2. 声明一个SynchronousQueue有两者方式。
    · 公平模式:采用公平锁,并配合一FIFO队列来阻塞多余的生产者消费者,从而体现整体的公平策略。
    · 非公平模式:采用非公平锁,并配合一LIFO队列来管理多余的生产者消费者,同时来进行一个抢占执行。

ArrayBlockingQueue和LinkedBlockingQueue的区别?

  1. 队列中锁的实现不同
    ArrayBlockingQueue实现的队列中锁没有分离,即生产消费共用一个锁;
    LinkedBlockingQueue实现的队列中锁是分离的,即生产用的是putLock,消费用的是takeLock
  2. 队列大小初始化方式不同
    ArrayBlockingQueue实现的队列中必须指定队列大小;
    LinkedBlockingQueue实现的队列中可以不指定,但默认是Integer.MAX_VALUE

饱和策略(也称拒绝策略)

  1. AbortPolicy (默认策略)
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExcutionException异常
  2. DiscardPolicy
    ThreadPoolExecutor.DiscardPolicy:丢弃任务,但不抛出异常
  3. DiscardOdestPolicy
    ThreadPoolExecutor.DiscardOdestPolicy:丢弃队列最前面的任务,然后重新尝试一个新的任务(重复此过程)
  4. CallerRunsPolicy
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,队列中的任务继续阻塞

第一种比较好,通过异常检测机制,检测到满了之后可以相应增加容量进行对应调整。

execute()

Executor接口中。

1
2
3
4
// Executor.java
public interface Executor {
void execute(Runnable command);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get(); // ct1是一个原子操作
// 1. 如果工作线程数小于核心线程数(核心线程池没满)
if (workerCountOf(c) < corePoolSize) {
// 创建新线程执行command任务,成功返回
if (addWorker(command, true))
return;
// 创建失败
c = ctl.get();
}
/**
* 2. 判断缓冲队列是否满了,未满将command存储到阻塞队列中
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/**
* rejectedExecution:当没有更多的线程或队列插槽时,可能会发生调用
* 如果一个任务是能被加入到队列的,但是我们还需要再次检查是否应该添加线程。因为:
* 1) 自上次检查以来已有的线程死亡或者自从进入这个方法以来线程池就关闭了。
* 如果是SHUTDOWN状态,不能再接受新任务,但能继续处理阻塞队列中保存的任务;如果是STOP状态,会中断正在处理任务的线程。
* 所以我们需要再次检查的状态,如果终止了要回退队列,如果没有增加线程。
* remove()从队列中移除command后会尝试terminated()
*/
// 当前运行线程数量为0,移除command
if (! isRunning(recheck) && remove(command))
reject(command); //为command调用rejectedExecution拒绝任务
// 工作线程数为0
else if (workerCountOf(recheck) == 0)
addWorker(null, false); //使用maximumPoolSize作为约束创建新线程
}
/**
* 3. 如果一个任务不能被加入到队列(阻塞队列已满),创建新线程来执行任务
* 如果不能再增加新的线程(线程池已满),调用拒绝策略RejectedExcutionHandler来处理
*/
else if (!addWorker(command, false))
reject(command);
}

注:

  1. workcount是允许启动而不允许停止的线程(核心线程池没满时候创建的线程)
  2. isRunning的是正在运行的线程

execute方法执行逻辑

corePoolSize的线程不会被回收吗?
如果最大线程数是100,核心线程数为40,意味着这40个线程是不会被回收的,而超出的60个在一定时间后会被回收

Executor和Submit

Executor接口某个对应的子类实现(ExecutorService接口)中,多了几个方法。其中一个就是submit方法。
submit是基于方法Executor.execute(Runnable)的延伸,通过创建并返回一个Future类对象可用于取消执行和/或等待完成
对于Future接口,其方法有:
在这里插入图片描述

execute方法和submit方法提交的区别:

  1. 如果execute()提交并不会有返回值,而用submit()提交会有Future类返回值。
  2. execute()里面不支持传入Callable接口,而submit()既支持Callable接口又支持Runnable接口

用execute方法还是submit方法提交?
如果想要返回值,就用execute(),如果不需要返回值,不需要逻辑判断,此时就直接execute()即可。

线程池的关闭

自定义线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(5));
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Math.random());
}
});
threadPoolExecutor.shutdown();
}
}