Welcome everyone

线程池中的奇技淫巧

java 汪明鑫 920浏览 0评论

线程池介绍

见上一篇文章

 

线程池工作原理

【当一个任务被添加进线程池时】

1.线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务

2.线程数量达到了corePoolSize,则将任务移入队列等待

3.队列已满,新建线程(非核心线程)执行任务

4.队列已满,总线程数又达到了 maximumPoolSize,就会由丢弃策略处理

 

 

注意:

在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务

(除非调用了 prestartAllCoreThreads () 或者 prestartCoreThread () 方法,从这 2 个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建 corePoolSize 个线程或者一个线程)

 

线程池核心参数刨析

有多个重载的构造函数

 

 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

 

corePoolSize  maximumPoolSize 不用说了

keepAliveTime 是指非核心线程空闲的时间 (unit 是时间单位),超过空闲时间就会被销毁

 

如果需要指定核心线程也被销毁  allowCoreThreadTimeOut = true

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

 

BlockingQueue 一般传个有界队列ArrayBlockingQueue即可

注意LinkedBlockingQueue 是无界队列 ,可能会导致OOM

 

线程池正确使用方式

public class ThreadPoolConfig {

    /**
     * 机器内核数
     */
    private final static int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors() * 2;

    /**
     * 线程池创建
     */
    private static ExecutorService THREADPOOL = new ThreadPoolExecutor(AVAILABLE_PROCESSORS, 50,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(1024), new ThreadFactoryBuilder()
        .setNameFormat("call-thread-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());

    private ThreadPoolConfig() { }

    /**
     * 获取定长线程池
     *
     * @return
     */
    public static ExecutorService getThreadPool() {
        return THREADPOOL;
    }
}

 

public class ThreadPoolUtil {

    private static volatile ThreadPoolExecutor threadPoolExecutor = null;

    public static ThreadPoolExecutor getThreadPool(){
        if(threadPoolExecutor == null){
            synchronized (ThreadPoolUtil.class){
                if(threadPoolExecutor == null){
                    threadPoolExecutor =
                        new ThreadPoolExecutor(20,30,
                            10, TimeUnit.SECONDS,new LinkedBlockingDeque<>(100),
                            new NameTreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
                }
            }
        }
        return threadPoolExecutor;
    }

    static class NameTreadFactory implements ThreadFactory {

        private final AtomicInteger mThreadNum = new AtomicInteger(1);

        @Override
        public Thread newThread( Runnable r) {
            return new Thread(r, "my-thread-" + mThreadNum.getAndIncrement());
        }
    }


}

 

阿里Java规范 线程/线程池相关

【强制】创建线程或线程池时请指定有意义的线程名称,方便出错时回溯。
正例:
public class TimerTaskThread extends Thread {
public TimerTaskThread() {
super.setName(“TimerTaskThread”);

}
}

 

【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明: 使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决
资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或
者“过度切换”的问题

 

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样
的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
说明: Executors 返回的线程池对象的弊端如下:
1) FixedThreadPool 和 SingleThreadPool:
允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM。
2) CachedThreadPool 和 ScheduledThreadPool:
允许的创建线程数量为 Integer.MAX_VALUE, 可能会创建大量的线程,从而导致 OOM。

 

Executors使用示例

虽然Executors不推荐使用,简单看一下也没什么坏处

public static void main(String[] args) {
		//创建一个线程池
		ExecutorService pool = Executors.newCachedThreadPool();
		for(int i = 1; i < 5; i++){
			pool.execute(new Runnable() {
				@Override
				public void run() {
					System.out.println("thread name: " + Thread.currentThread().getName());
					try {
						Thread.sleep(1000);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			});
		}
		pool.shutdown();
	}

public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService pool = Executors.newFixedThreadPool(4); 
		
		for(int i = 0; i < 10; i++){
			Future<String> submit = pool.submit(new Callable<String>(){
				@Override
				public String call() throws Exception {
					//System.out.println("a");
					Thread.sleep(5000);
					return Thread.currentThread().getName();
				}			   
			   });
			//从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务返回结果 
			System.out.println(submit.get());
		} 
			pool.shutdown();
	}

 

callable 跟runnable的区别:
runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值
callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果

 

SingleThreadExecutor 控制多线程执行顺序

public class Main2 {
    static Thread thread1=new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("thread1...");
        }
    });

    static Thread thread2=new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("thread2...");
        }
    });

    static Thread thread3=new Thread(new Runnable() {
        @Override
        public void run() {
            System.out.println("thread3...");
        }
    });

    static ExecutorService executorService = Executors.newSingleThreadExecutor();   //FIFO  先进先出

    public static void main(String[] args) throws Exception{
        executorService.submit(thread1);
        executorService.submit(thread2);
        executorService.submit(thread3);
        executorService.shutdown();
    }
}

 

 

小demo感受一下

public static void main(String[] args) {
		/**
		 * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
		 * 若大于corePoolSize,则会将任务加入队列,
		 * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
		 * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
		 * 
		 */	
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1, 				//coreSize
				2, 				//MaxSize
				60, 			
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3)			//指定一种队列 (有界队列)
				//new LinkedBlockingQueue<Runnable>()
				, new MyRejected()
				//, new DiscardOldestPolicy()
				);
		
		MyTask mt1 = new MyTask(1, "任务1");
		MyTask mt2 = new MyTask(2, "任务2");
		MyTask mt3 = new MyTask(3, "任务3");
		MyTask mt4 = new MyTask(4, "任务4");
		MyTask mt5 = new MyTask(5, "任务5");
		MyTask mt6 = new MyTask(6, "任务6");
		
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);
		pool.execute(mt4);
		pool.execute(mt5);
		pool.execute(mt6);
		
		pool.shutdown();
		
	}

 

注意:无界队列下,maximumPoolSize 没有意义

 

线程池数据指标获取

 

private static ExecutorService executor = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>(100000));

public static void main(String[] args) throws Exception {
    for (int i = 0; i < 100000; i++) {
        executor.execute(() -> {
            System.out.print(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }

    ThreadPoolExecutor tpe = ((ThreadPoolExecutor) executor);

    while (true) {
        System.out.println();

        int queueSize = tpe.getQueue().size();
        System.out.println("当前排队线程数:" + queueSize);

        int activeCount = tpe.getActiveCount();
        System.out.println("当前活动线程数:" + activeCount);

        long completedTaskCount = tpe.getCompletedTaskCount();
        System.out.println("执行完成线程数:" + completedTaskCount);

        long taskCount = tpe.getTaskCount();
        System.out.println("总线程数:" + taskCount);

        Thread.sleep(3000);
    }

}

 

 

转载请注明:汪明鑫的个人博客 » 线程池中的奇技淫巧

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz