Welcome everyone

亲缘性线程池

java 汪明鑫 1686浏览 0评论

JDK中的线程池固然好,但是其不具有亲缘性,也就是当我们向其中顺序投递多个任务后,JDK默认的线程池实现不能保证具有相同属性的任务顺序执行

而亲缘性的线程池可以保证,顺序执行具有相同属性的任务。

 

普通线程池执行多个任务无法保证顺序性,我们来看亲缘池是怎么做到的?

 

引入maven 依赖

<dependency>
      <groupId>com.github.phantomthief</groupId>
      <artifactId>simple-pool</artifactId>
      <version>0.1.17</version>
</dependency>

 

package pers.wmx.springbootfreemarkerdemo;

import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.assertj.core.util.Lists;

import com.github.phantomthief.pool.KeyAffinityExecutor;

/**
 * @author wangmingxin03
 * Created on 2021-04-15
 */
public class AffinityThreadPool {
    static KeyAffinityExecutor executor =
            KeyAffinityExecutor.newSerializingExecutor(8, 200, "MY-POOL");

    static ExecutorService normalExecutor = Executors.newFixedThreadPool(8);


    public static void main(String[] args) {
        List<Person> personList = mockData();

        // 无法保证顺序性
//        personList.forEach(person -> {
//            normalExecutor.execute(() -> System.out.println(person.getName()));
//        });

        personList.forEach(person -> {
            executor.executeEx(person.getId(), () -> System.out.println(person.getName()));
        });

    }

    private static List<Person> mockData() {
        List<Person> personList = Lists.newArrayList();
        Person p1 = new Person(1, "a");
        Person p2 = new Person(2, "bbb");
        Person p3 = new Person(1, "aaa");
        Person p4 = new Person(1, "aa");
        Person p5 = new Person(2, "b");
        personList.add(p1);
        personList.add(p2);
        personList.add(p3);
        personList.add(p4);
        personList.add(p5);
        return personList;
    }
}

class Person {
    private int id;

    private String name;

    public Person(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

 

我们先看下亲缘池的初始化

KeyAffinityExecutor executor =
KeyAffinityExecutor.newSerializingExecutor(8, 200, "MY-POOL");

 

/**
     * @param parallelism max concurrency for task submitted.
     * @param queueBufferSize max queue size for every executor, 0 means unbounded queue.
     * @param threadName see {@link ThreadFactoryBuilder#setNameFormat(String)}
     */
    @Nonnull
    static <K> KeyAffinityExecutor<K> newSerializingExecutor(int parallelism, int queueBufferSize,
            String threadName) {
        return newKeyAffinityExecutor() //
                .count(parallelism) //
                .executor(new Supplier<ExecutorService>() {

                    private final ThreadFactory threadFactory = new ThreadFactoryBuilder() //
                            .setNameFormat(threadName) //
                            .build();

                    @Override
                    public ExecutorService get() {
                        LinkedBlockingQueue<Runnable> queue;
                        if (queueBufferSize > 0) {
                            queue = new LinkedBlockingQueue<Runnable>(queueBufferSize) {

                                @Override
                                public boolean offer(Runnable e) {
                                    try {
                                        put(e);
                                        return true;
                                    } catch (InterruptedException ie) {
                                        Thread.currentThread().interrupt();
                                    }
                                    return false;
                                }
                            };
                        } else {
                            queue = new LinkedBlockingQueue<>();
                        }
                        
                        // 可以看到构造的线程池的大小为1 !!!
                        return new ThreadPoolExecutor(1, 1, 0L, MILLISECONDS, queue, threadFactory);
                    }
                }) 
                .build();
    }

 

亲缘池构造

@Nonnull
    public <K> KeyAffinityExecutor<K> build() {
        if (shutdownAfterClose) {
            builder.depose(it -> shutdownAndAwaitTermination(it, 1, DAYS));
        }
        builder.ensure();
        KeyAffinityExecutorImpl<K> result = new KeyAffinityExecutorImpl<>(builder::buildInner);
        ALL_EXECUTORS.put(result, wrapStats(result));
        return result;
    }

 

再看下buildInner

KeyAffinityImpl(@Nonnull Supplier<V> supplier, int count,
            @Nonnull ThrowableConsumer<V, Exception> deposeFunc, boolean usingRandom) {
        this.usingRandom = usingRandom;
        this.all = IntStream.range(0, count) //
                .mapToObj(it -> supplier.get()) //
                .map(ValueRef::new) //
                .collect(toList());
        this.deposeFunc = checkNotNull(deposeFunc);
    }

最终是构造了count 个大小为1的线程池

 

并记录了线程池和每个线程池对应的并发度 concurrency

private class ValueRef {

        private final V obj;
        private final AtomicInteger concurrency = new AtomicInteger();

        ValueRef(V obj) {
            this.obj = obj;
        }

        int concurrency() {
            return concurrency.get();
        }
    }

 

然后再看任务执行部分,我们mock了一批数据,如果用普通线程池,我们就没法保证执行的顺序性

我们使用亲缘池,制定一个key, 基于这个key的任务的投递执行保证顺序性

personList.forEach(person -> {
executor.executeEx(person.getId(), () -> System.out.println(person.getName()));
});

我们的key就是id, 也是说id相同的任务投递顺序和执行顺序一样

 

从亲缘池里选择一个线程池去执行当前key对应的任务

default void executeEx(K key, @Nonnull ThrowableRunnable<Exception> task) {
        checkNotNull(task);

        ListeningExecutorService service = select(key);
        boolean addCallback = false;
        try {
            service.execute(() -> {
                try {
                    task.run();
                } catch (Throwable e) { // pass to uncaught exception handler
                    throwIfUnchecked(e);
                    throw new UncheckedExecutionException(e);
                } finally {
                    finishCall(key);
                }
            });
            addCallback = true;
        } finally {
            if (!addCallback) {
                finishCall(key);
            }
        }
    }

 

然后看下核心方法 select(key)

@Nonnull
    @Override
    public V select(K key) {
        KeyRef keyRef = mapping.compute(key, (k, v) -> {
            if (v == null) {
                if (usingRandom) {
                    v = new KeyRef(all.get(ThreadLocalRandom.current().nextInt(all.size())));
                } else {
                    v = all.stream() //
                            .min(comparingInt(ValueRef::concurrency)) //
                            .map(KeyRef::new) //
                            .orElseThrow(IllegalStateException::new);
                }
            }
            v.incrConcurrency();
            return v;
        });
        return keyRef.ref();
    }

mapping 是记录 key -> 线程池 (大小为1 ) 【需要额外存储

从所有线程池中选择一个并发度最小的线程池 (来实现负载均衡)   min(comparingInt(ValueRef::concurrency))

并给当前线程池的并发度 ++

 

一个key的所有任务都映射到一个线程池执行,然后线程池的大小又是1,自然可以保证顺序性。

这里的实现是挑选并发度最小 + 一个额外的存储 map

private final Map<K, KeyRef> mapping = new ConcurrentHashMap<>();

我们是不是也可以自己实现不去记录并发度,也不需要额外的存储记录key到线程池的映射关系

按照某一个固定的hash映射规则来挑选线程池也是阔以的。

 

 

转载请注明:汪明鑫的个人博客 » 亲缘性线程池

喜欢 (0)

说点什么

您将是第一位评论人!

提醒
avatar
wpDiscuz