本文共 8566 字,大约阅读时间需要 28 分钟。
我们知道多个线程可以并行执行多个任务,当任务执行完毕后,线程进入死亡状态,Thread对象等待JVM回收,如果我们的需求是需要持续的稳定的创建线程执行任务,可能会导致线程栈内存过大,导致JVM发生StackOverflowError错误。
因为线程的创建和销毁是非常消耗资源的,所以对于频繁使用线程的项目,应该考虑使用线程池技术,线程池维护着一定数量的线程,会对执行完任务的线程进行回收,减少线程的创建和销毁,避免资源过多的消耗。
在java中,线程池是ThreadPoolExecutor类,这个类有4个构造方法:参数比较多,这里就说下最长的构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueueworkQueue)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,RejectedExecutionHandler handler)public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory, RejectedExecutionHandler handler)
最长的ThreadPoolExecutor的构造有7个参数。
TimeUnit.SECONDS 单位秒 TimeUnit.MILLISECONDS 单位毫秒 TimeUnit.MICROSECONDS 单位微秒(千分之一毫秒) TimeUnit.MINUTES 单位分 TimeUnit.DAYS 单位天 TimeUnit.HOURS 单位消失 TimeUnit.NANOSECONDS 单位纳秒(千分之一微秒)
workQueue:用来保存等待被执行的任务的阻塞队列。
BlockingQueue阻塞队列的子类比较多当然也可以自己实现。阻塞队列当中装载的对象都是必须是Runnable的子类,也就是线程所执行的任务。
java JDK7提供了七个阻塞队列,如下
ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列,此队列按照先进先出(FIFO)的原则对元素进行排序。LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列,此队列的默认和最大长度是Interge.MAX_VALUE。PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列DelayQueue:是一个支持延时获取元素的无界阻塞队列。SynchronousQueue:一个不存储元素的阻塞队列,每一个put操作必须等待一个take操作,否则不能继续添加元素。LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列,所谓双向队列是指可以从队列的两端插入和移除元素。
threadFactory:一个创建线程的工厂类对象
ThreadFactory是一个接口,里面定义了一个方法newThread(Runnable r)
。
public interface ThreadFactory { //创建一个线程 Thread newThread(Runnable r);}
在Executors中有默认的实现方式,如:
//ThreadPoolExecutor第三个构造实现便是调用了该方法Executors.defaultThreadFactory();//Executors类中的静态方法public static ThreadFactory defaultThreadFactory() { //调用的是DefaultThreadFactory类的构造,构造中设置好变量名,等调用newThread()方法时,就可以给Thread对象设置名字,优先级等 return new DefaultThreadFactory();}static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager var1 = System.getSecurityManager(); this.group = var1 != null?var1.getThreadGroup():Thread.currentThread().getThreadGroup(); this.namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable var1) { Thread var2 = new Thread(this.group, var1, this.namePrefix + this.threadNumber.getAndIncrement(), 0L); if(var2.isDaemon()) { var2.setDaemon(false); } if(var2.getPriority() != 5) { var2.setPriority(5); } return var2; }}
handler:线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
1.AbortPolicy:中断(抛出异常)public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}2.CallerRunsPolicy:在调用线程执行任务,如果执行者(Executor)被关闭,任务则丢弃。public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }}3.DiscardPolicy:啥都不做,丢弃这个任务 public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }}4.DiscardOldestPolicy:丢弃任务队列中最旧任务public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }}
线程池执行的主要流程
线程池有这样几个重要的东西,核心线程数,最大线程数,阻塞队列,饱和策略。
当我们在不断的添加执行任务时,一开始任务不多,任务是通过核心线程来执行,如果核心线程没有执行完毕,这是会将任务添加到阻塞队列中,等待核心线程来执行,如果继续不断添加任务,阻塞队列添加满了,这时需要启动最大线程数来执行任务,如果再添加任务,超出最大线程数,这是就需要启动饱和策略处理。
如图:
代码体现:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); //1.如果正在工作的线程少于核心线程 if (workerCountOf(c) < corePoolSize) { //添加到工作线程,true参数表示将核心线程添加到工作线程 if (addWorker(command, true)) return; c = ctl.get(); } //2.核心线程在运行,将任务添加到队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //判断一种临界状态 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0)//工作线程为0 addWorker(null, false);//传入一个空任务,在addwork有判断处理,会立马返回false,不会继续执行 } else if (!addWorker(command, false))//3.添加到非核心线程 reject(command);//4.如果不能添加成功,则调用饱和策略 }
private static final int CORE_POOL_SIZE = 3;private static final int MAX_POOL_SIZE = 10;private static final int KEEP_ALIVE_TIME = 3;ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( CORE_POOL_SIZE,//核心线程数 MAX_POOL_SIZE,//最大线程数 KEEP_ALIVE_TIME,//线程非工作状态存活时间 TimeUnit.MINUTES,//单位分钟 new LinkedBlockingQueue(),//链表结构队列,默认为Integer.MAX_VALUE。 Executors.defaultThreadFactory(),//使用java中封装好的线程(如果自己需要定制线程名称,可以自己实现) new ThreadPoolExecutor.CallerRunsPolicy()//饱和策略,用调用者所在的线程来执行任务);
提交任务给线程池,执行(执行有2种方式)
executor(Runnable r) 常用
threadPoolExecutor.execute(new Runnable() { @Override public void run() { //具体任务 }});
submit() 不常用
submit()有3中方法重载。
submit()方法用于提交需要返回值的任务。会返回一个Future类的对象。通过future对象可以判断任务是否执行成功。而通过future的get()方法来获取返回值。
public Future submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask;}public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask;}public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}protected RunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value);}protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask (callable);}
从上面源码可以看出,submit()调用的也是executor(),但是有一个返回值,该返回值就是new FutureTask<T>()
,作用就是验证executor()是否执行成功。
关闭线程池的2种方式
关闭线程池都是遍历线程池中的工作线程,调用Thread的interrupt()方法将其打断或中断。
shutdown():
当线程池调用该方法时,线程池的状态则立刻变成SHUTDOWN状态。此时,则不能再往线程池中添加任何任务,否则将会抛出RejectedExecutionException异常。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN);//:将线程池的状态设置为SHUTDOWN状态 interruptIdleWorkers();//遍历中断线程 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate();}
shutdownNow():
执行该方法,线程池的状态立刻变成STOP状态,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。
它试图终止线程的方法是通过调用Thread.interrupt()方法来实现的,这种方法的作用有限,如果线程中没有sleep 、wait、Condition、定时锁等应用, interrupt()方法是无法中断当前的线程的。所以,ShutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。
public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP);//将线程池状态设置为STOP状态 interruptWorkers();//遍历中断线程 tasks = drainQueue();//返回正在执行或暂停的线程 } finally { mainLock.unlock(); } tryTerminate(); return tasks;}
对于调用哪种方法来关闭线程池,应该有提交到线程池的任务特性决定,通常调用shutdown()
方法来关闭线程池。如果任务不一定要执行完,则可以调用shutdownNow()
转载地址:http://tesvn.baihongyu.com/