Java 线程池执行器 : Updating core pool size dynamically rejects incoming tasks intermittently

jackei 阅读:10 2024-06-20 12:54:19 评论:0

我遇到了一个问题,如果我尝试调整 ThreadPoolExecutor 的大小创建池后,将核心池大小设置为不同的数字,然后间歇性地,某些任务被拒绝并返回 RejectedExecutionException即使我从来没有提交超过 queueSize + maxPoolSize任务数。

我试图解决的问题是扩展 ThreadPoolExecutor它根据线程池队列中的挂起执行调整其核心线程的大小。我需要这个,因为默认情况下是 ThreadPoolExecutor将创建一个新的 Thread仅当队列已满时。

这是一个小型的自包含 Pure Java 8 程序,用于演示该问题。

import static java.lang.Math.max; 
import static java.lang.Math.min; 
import java.util.concurrent.CompletableFuture; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ScheduledExecutorService; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 
public class ThreadPoolResizeTest { 
    public static void main(String[] args) throws Exception { 
        // increase the number of iterations if unable to reproduce 
        // for me 100 iterations have been enough 
        int numberOfExecutions = 100; 
        for (int i = 1; i <= numberOfExecutions; i++) { 
    private static void executeOnce() throws Exception { 
        int minThreads = 1; 
        int maxThreads = 5; 
        int queueCapacity = 10; 
        ThreadPoolExecutor pool = new ThreadPoolExecutor( 
                minThreads, maxThreads, 
                0, TimeUnit.SECONDS, 
                new LinkedBlockingQueue<Runnable>(queueCapacity), 
                new ThreadPoolExecutor.AbortPolicy() 
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads), 
                0, 10, TimeUnit.MILLISECONDS); 
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>(); 
        try { 
            int totalTasksToSubmit = queueCapacity + maxThreads; 
            for (int i = 1; i <= totalTasksToSubmit; i++) { 
                // following line sometimes throws a RejectedExecutionException 
                pool.submit(() -> { 
                    // block the thread and prevent it from completing the task 
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away 
        } finally { 
     * Resize the thread pool if the number of pending tasks are non-zero. 
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) { 
        int pendingExecutions = pool.getQueue().size(); 
        int approximateRunningExecutions = pool.getActiveCount(); 
         * New core thread count should be the sum of pending and currently executing tasks 
         * with an upper bound of maxThreads and a lower bound of minThreads. 
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions)); 

如果我从不提交超过 queueCapacity+maxThreads 的数量,为什么池会抛出 RejectedExecutionException。我从不更改最大线程数,因此根据 ThreadPoolExecutor 的定义,它应该在线程或队列中容纳任务。


关于如何修复 RejectedExecutionException 的任何指示?



在我的示例中,我使用 minThreads = 0、maxThreads = 2 和 queueCapacity = 2 使其更短。
第一个命令被提交,这是在方法 execute 中完成的:

public void execute(Runnable command) { 
    if (command == null) 
        throw new NullPointerException(); 
     * Proceed in 3 steps: 
     * 1. If fewer than corePoolSize threads are running, try to 
     * start a new thread with the given command as its first 
     * task.  The call to addWorker atomically checks runState and 
     * workerCount, and so prevents false alarms that would add 
     * threads when it shouldn't, by returning false. 
     * 2. If a task can be successfully queued, then we still need 
     * to double-check whether we should have added a thread 
     * (because existing ones died since last checking) or that 
     * the pool shut down since entry into this method. So we 
     * recheck state and if necessary roll back the enqueuing if 
     * stopped, or start a new thread if there are none. 
     * 3. If we cannot queue task, then we try to add a new 
     * thread.  If it fails, we know we are shut down or saturated 
     * and so reject the task. 
    int c = ctl.get(); 
    if (workerCountOf(c) < corePoolSize) { 
        if (addWorker(command, true)) 
        c = ctl.get(); 
    if (isRunning(c) && workQueue.offer(command)) { 
        int recheck = ctl.get(); 
        if (! isRunning(recheck) && remove(command)) 
        else if (workerCountOf(recheck) == 0) 
            addWorker(null, false); 
    else if (!addWorker(command, false)) 

对于这个命令 workQueue.offer(command) 而不是 addWorker(null, false) 被执行。

这次 workQueue.offer(command) 被提交,第二个命令被提交。现在队列已满

现在 ScheduledExecutorService 执行 resizeThreadPool 方法,该方法使用 maxThreads 调用 setCorePoolSize。
这是 setCorePoolSize 方法:
 public void setCorePoolSize(int corePoolSize) { 
    if (corePoolSize < 0) 
        throw new IllegalArgumentException(); 
    int delta = corePoolSize - this.corePoolSize; 
    this.corePoolSize = corePoolSize; 
    if (workerCountOf(ctl.get()) > corePoolSize) 
    else if (delta > 0) { 
        // We don't really know how many new threads are "needed". 
        // As a heuristic, prestart enough new workers (up to new 
        // core size) to handle the current number of tasks in 
        // queue, but stop if queue becomes empty while doing so. 
        int k = Math.min(delta, workQueue.size()); 
        while (k-- > 0 && addWorker(null, true)) { 
            if (workQueue.isEmpty()) 

此方法使用 addWorker(null, true) 添加一名 worker 。不,有 2 个工作队列正在运行,最大且队列已满。

第三个命令被提交并失败,因为 workQueue.offer(command) 和 addWorker(command, false) 失败,导致异常:
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0] 
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution( 
at java.util.concurrent.ThreadPoolExecutor.reject( 
at java.util.concurrent.ThreadPoolExecutor.execute( 
at java.util.concurrent.AbstractExecutorService.submit( 
at ThreadPoolResizeTest.executeOnce( 
at ThreadPoolResizeTest.runTest( 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke( 
at sun.reflect.DelegatingMethodAccessorImpl.invoke( 
at java.lang.reflect.Method.invoke( 
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall( 
at org.junit.runners.model.FrameworkMethod.invokeExplosively( 
at org.junit.internal.runners.statements.InvokeMethod.evaluate( 
at org.junit.runners.ParentRunner.runLeaf( 
at org.junit.runners.BlockJUnit4ClassRunner.runChild( 
at org.junit.runners.BlockJUnit4ClassRunner.runChild( 
at org.junit.runners.ParentRunner$ 
at org.junit.runners.ParentRunner$1.schedule( 
at org.junit.runners.ParentRunner.runChildren( 
at org.junit.runners.ParentRunner.access$000( 
at org.junit.runners.ParentRunner$2.evaluate( 
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(