JDK中的线程池固然好,但是其不具有亲缘性,也就是当我们向其中顺序投递多个任务后,JDK默认的线程池实现不能保证具有相同属性的任务顺序执行,
而亲缘性的线程池可以保证,顺序执行具有相同属性的任务。
普通线程池执行多个任务无法保证顺序性,我们来看亲缘池是怎么做到的?
引入maven 依赖
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>simple-pool</artifactId>
<version>0.1.17</version>
</dependency>
package pers.wmx.springbootfreemarkerdemo;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.assertj.core.util.Lists;
import com.github.phantomthief.pool.KeyAffinityExecutor;
/**
* @author wangmingxin03
* Created on 2021-04-15
*/
public class AffinityThreadPool {
static KeyAffinityExecutor executor =
KeyAffinityExecutor.newSerializingExecutor(8, 200, "MY-POOL");
static ExecutorService normalExecutor = Executors.newFixedThreadPool(8);
public static void main(String[] args) {
List<Person> personList = mockData();
// 无法保证顺序性
// personList.forEach(person -> {
// normalExecutor.execute(() -> System.out.println(person.getName()));
// });
personList.forEach(person -> {
executor.executeEx(person.getId(), () -> System.out.println(person.getName()));
});
}
private static List<Person> mockData() {
List<Person> personList = Lists.newArrayList();
Person p1 = new Person(1, "a");
Person p2 = new Person(2, "bbb");
Person p3 = new Person(1, "aaa");
Person p4 = new Person(1, "aa");
Person p5 = new Person(2, "b");
personList.add(p1);
personList.add(p2);
personList.add(p3);
personList.add(p4);
personList.add(p5);
return personList;
}
}
class Person {
private int id;
private String name;
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
我们先看下亲缘池的初始化
KeyAffinityExecutor executor =
KeyAffinityExecutor.newSerializingExecutor(8, 200, "MY-POOL");
/**
* @param parallelism max concurrency for task submitted.
* @param queueBufferSize max queue size for every executor, 0 means unbounded queue.
* @param threadName see {@link ThreadFactoryBuilder#setNameFormat(String)}
*/
@Nonnull
static <K> KeyAffinityExecutor<K> newSerializingExecutor(int parallelism, int queueBufferSize,
String threadName) {
return newKeyAffinityExecutor() //
.count(parallelism) //
.executor(new Supplier<ExecutorService>() {
private final ThreadFactory threadFactory = new ThreadFactoryBuilder() //
.setNameFormat(threadName) //
.build();
@Override
public ExecutorService get() {
LinkedBlockingQueue<Runnable> queue;
if (queueBufferSize > 0) {
queue = new LinkedBlockingQueue<Runnable>(queueBufferSize) {
@Override
public boolean offer(Runnable e) {
try {
put(e);
return true;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
return false;
}
};
} else {
queue = new LinkedBlockingQueue<>();
}
// 可以看到构造的线程池的大小为1 !!!
return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory);
}
})
.build();
}
亲缘池构造
@Nonnull
public <K> KeyAffinityExecutor<K> build() {
if (shutdownAfterClose) {
builder.depose(it -> shutdownAndAwaitTermination(it, 1, DAYS));
}
builder.ensure();
KeyAffinityExecutorImpl<K> result = new KeyAffinityExecutorImpl<>(builder::buildInner);
ALL_EXECUTORS.put(result, wrapStats(result));
return result;
}
再看下buildInner
KeyAffinityImpl(@Nonnull Supplier<V> supplier, int count,
@Nonnull ThrowableConsumer<V, Exception> deposeFunc, boolean usingRandom) {
this.usingRandom = usingRandom;
this.all = IntStream.range(0, count) //
.mapToObj(it -> supplier.get()) //
.map(ValueRef::new) //
.collect(toList());
this.deposeFunc = checkNotNull(deposeFunc);
}
最终是构造了count
个大小为1的线程池
并记录了线程池和每个线程池对应的并发度 concurrency
private class ValueRef {
private final V obj;
private final AtomicInteger concurrency = new AtomicInteger();
ValueRef(V obj) {
this.obj = obj;
}
int concurrency() {
return concurrency.get();
}
}
然后再看任务执行部分,我们mock了一批数据,如果用普通线程池,我们就没法保证执行的顺序性
我们使用亲缘池,制定一个key, 基于这个key的任务的投递执行保证顺序性
personList.forEach(person -> {
executor.executeEx(person.getId(), () -> System.out.println(person.getName()));
});
我们的key就是id, 也是说id相同的任务投递顺序和执行顺序一样
从亲缘池里选择一个线程池去执行当前key对应的任务
default void executeEx(K key, @Nonnull ThrowableRunnable<Exception> task) {
checkNotNull(task);
ListeningExecutorService service = select(key);
boolean addCallback = false;
try {
service.execute(() -> {
try {
task.run();
} catch (Throwable e) { // pass to uncaught exception handler
throwIfUnchecked(e);
throw new UncheckedExecutionException(e);
} finally {
finishCall(key);
}
});
addCallback = true;
} finally {
if (!addCallback) {
finishCall(key);
}
}
}
然后看下核心方法 select(key)
@Nonnull
@Override
public V select(K key) {
KeyRef keyRef = mapping.compute(key, (k, v) -> {
if (v == null) {
if (usingRandom) {
v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size())));
} else {
v = all.stream() //
.min(comparingInt(ValueRef::concurrency)) //
.map(KeyRef::new) //
.orElseThrow(IllegalStateException::new);
}
}
v.incrConcurrency();
return v;
});
return keyRef.ref();
}
mapping
是记录 key -> 线程池 (大小为1 ) 【需要额外存储】
从所有线程池中选择一个并发度最小的线程池 (来实现负载均衡) min(comparingInt(ValueRef::concurrency))
并给当前线程池的并发度 ++
一个key的所有任务都映射到一个线程池执行,然后线程池的大小又是1,自然可以保证顺序性。
这里的实现是挑选并发度最小 + 一个额外的存储 map
private final Map<K, KeyRef> mapping = new ConcurrentHashMap<>();
我们是不是也可以自己实现不去记录并发度,也不需要额外的存储记录key到线程池的映射关系
按照某一个固定的hash映射规则来挑选线程池也是阔以的。
说点什么
您将是第一位评论人!