短频快task的java解决方案

程序浅谈 后端 最近

短频快task的java解决方案

看到这个标题,大家一定很好奇,感觉这是一个重复造轮子的事情。java明明已经提供了WorkStealingPool,本身是带窃取能力的。这里就需要讲一下背景。这里主要来自WorkStealingPool的能力缺陷。

WorkStealingPool的能力缺陷java

代码解读
复制代码
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }

java的WorkStealingPool本质是forkjoinpool。他是利用了forkjoinpool的窃取任务的能力,来达到窃取的效果。forkjoinpool在实现复杂场景的fork,join的过程,也带来了一个问题,就是他无法做到中断程序。例如task1,拆出了task2,task3,task4。现在task2正在被执行,task3还在队列里,task4已经被窃取走了。现在对整体任务1发起中断。正确的中断是task2中断,task3取消,task4中断。最终对于task1而言是被中断了。这里就有了一个问题,task4被窃取走了,怎么知道那个线程需要被中断呢?这个问题forkjoinpool也没实现。在forkjointask的实现过程中也体现了这点。java

代码解读
复制代码
public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; }

在取消任务的方法中mayInterruptIfRunning直接没有被使用。注释中直接写了这个参数不生效。arduino

代码解读
复制代码
Params: mayInterruptIfRunning – this value has no effect in the default implementation because interrupts are not used to control cancellation.

所以自带的WorkStealingPool是无法发起中断的。只能发起取消。

普通线程池为什么满足不了场景

java自带的线程数是一个生产消费模型,生产者就是我们提交的任务,消费者线程池的线程,用来执行任务。这个从模型上看还是一个公平模型,那个线程执行完了就从队列里获取任务,这样算力能力也不会被浪费。但是这个地方有个前提,就是任务执行的时间一定是得大于从队列获取的时间。对于短频快的任务,获取任务的损耗就会变得特别明显。线程池的队列必须是一个阻塞队列。这里就涉及到锁的竞争,如下的代码是线程池获取任务的地方workQueue就是阻塞队列,poll,take都会涉及到锁的竞争。java

代码解读
复制代码
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; }

了解了场景之后,我们想这个场景的优化方式,其实之一就是批处理,反正任务执行快,把多个任务合并成一个,这样每个线程执行的时候就是按照一批来的。锁竞争的的损耗被降低了。
上面的批量的方案运行良好的情况每个任务的执行时间都差不多。但是现实情况是无法保证的,可能因为一些不合理的操作,导致某一个任务的执行时间变长,或者某一个批次的每个任务都比其他的多一些,最终导致了整体的执行时间变长。在整体的观测线程池的线程运行情况,就会发现某一个线程一直在执行,但是其他的线程还有空闲,造成了线程算力的浪费。

这里在看窃取的能力,似乎更好的满足了这里的诉求。

基于场景的窃取

上面讲述了场景其实根因来自,批处理可能会触发不均衡的情况,我们只要在不均衡的时候,其他的空闲线程过来拿任务来跑。就满足的我们的场景。这里就有2个点,需要我们设计。

  1. 窃取的逻辑。
  2. 窃取的损耗

任务窃取的逻辑

这里我们只要在执行任务的时候,把执行的线程和同一批的task做关联即可。在创建任务的时候,把拆分好的任务和当前任务都设置在task里。线程执行完任务把task的状态设置为完成。在当前执行完成之后,在任务列表进行过滤,找出未完成的任务,窃取任务在自己的线程里执行。直到没有任务可以被执行为止。

窃取的损耗

这里我们可以参考WorkStealingPool,把任务队列设置为双端队列。当前线程的任务从队列的头获取,其他任务过来窃取的过程,从队列尾部窃取。这里优化不用引入阻塞队列,否则自身的任务执行也要过一遍锁。我们用更轻量级的方式。cas获取。这里就得借用unsafe的能力,对数组对象做cas。

例如下面的展示
自身任务的执行
|1|2|3|4|5|

执行的过程就是设置为null

|null|2|3|4|5|

其他线程来窃取就变成了

|null|2|3|null|null|

任务线程和窃取线程就避免了重量的锁操作,大家都是cas获取,当cas失败,证明已经没有可以窃取的任务了。

通过上面2个设计,就可以完成了任务的快速执行,还能线程的算力不浪费。而且在取消的任务的时候,保证更多的任务再被执行。

总结

如果任务要全部不中断的执行完,可以使用 WorkStealingPool。
如果任务执行时间长(大于锁的损耗),可以使用threadpool。
如果任务执行短,并且还需要被中断,可以在threadpool之上,进行二次的封装。设计任务窃取的逻辑。重点设计是窃取方式以及性能损耗。

Apipost 私有化火热进行中

评论