你知道java吗,java里的
00-1010什么是分叉连接?ForjoinTask任务ForkJoinPool线程池工作窃取算法构造方法提交方法创建工人(线程)示例:ForkJoinTask实现归并排序ForkJoin计算过程前言:
Forjoin是Java7中加入的新特性,可能大家都不熟悉,但是Java8中的Stream并行流依赖于ForkJoin。在ForkJoin系统中,最关键的是ForkJoinTask和ForkJoinPool。Fork Join就是利用分而治之的思想,将一个大任务按照一定的规则分割成小任务,然后通过Join进行聚合。
目录
ForkJoin字面意思是分叉,Join是组合的意思。我们可以理解为将一个大任务拆分成小任务进行计算和求解,最后将小任务的结果组合起来,找到大任务的解。我们可以把这些拆分的小任务交给不同的线程进行计算,这是一种分布式计算的思想。这类似于MapReduce,大数据中的分布式离线计算。ForkJoin最经典的应用之一是Java8中的Stream。我们知道流分为串行流和并行流,其中并行流依靠ForkJoin实现并行处理。
我们来看看核心的ForkJoinTask和ForkJoinPool。
什么是ForkJoin?
ForkJoinTask本身并不复杂,和异步任务计算的FutureTask一样实现了Future接口。我们在上一篇文章中谈到了FutureTask,所以你可以从源代码中读取——Java来看看异步任务计算的FutureTask。
我们先来看看ForkJoinTask的核心源代码,任务是如何用分治法计算的。
KjoinTask的核心是fork()和join()方法。
fork()
判断当前线程是否为ForkJoinWorkerThread线程,直接将当前线程推送到工作队列。是否调用ForkJoinPool的externalPush方法在ForkJoinPool中构建一个静态的common对象,这里调用common的externalPush()。
join()
调用doJoin()方法,等待线程执行完成。public Final forkjointaskfork(){ thread t;if((t=thread . current thread())ForkJoinWorkerThread的instance)((ForkJoinWorkerThread)t). work queue . push(this);else forkjoinpool.common . external push(this);还这个;} public final V join(){ int s;if ((s=doJoin() DONE_MASK)!=正常)报告异常;返回getraw result();} private int Dojo in(){ int s;螺纹t;ForkJoinWorkerThread wtForkJoinPool。工作队列w;return (s=status) 0?s :((t=thread . current thread())ForkJoinWorkerThread的实例)?(w=(wt=(ForkJoinWorkerThread)t)。工作队列)。tryUnpush(this) (s=doExec()) 0?s : wt.pool.awaitJoin(w,this,0L): externalAwaitDone();}//获取结果的方法由子类public abstract V getRawResult()实现;递归任务
是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。
public abstract class RecursiveTask<V> extends ForkJoinTask<V> { private static final long serialVersionUID = 5232453952276485270L; V result; protected abstract V compute(); public final V getRawResult() { return result; } protected final void setRawResult(V value) { result = value; } protected final boolean exec() { result = compute(); return true; }}
ForkJoinPool 线程池
ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从java源码分析线程池(池化技术)的实现原理
ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。
工作窃取算法
ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去窃取其它任务队列中的任务,这就是所谓的工作窃取算法。
对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足工作窃取的需求,任务队列应该支持从队尾出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。
构造方法
首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?
int parallelism 可并行级别(不代表最多存在的线程数量)ForkJoinWorkerThreadFactory factory 线程创建工厂UncaughtExceptionHandler handler 异常捕获处理器boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); }public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); }public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }
提交方法
下面我们看一下提交任务的方法:
externalPush
这个方法我们很眼熟,它正是在fork的时候如果当前线程不是ForkJoinWorkerThread,新提交任务也是会通过这个方法去执行任务。由此可见,fork就是新建一个子任务进行提交。
externalSubmit
是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。
signalWork
(ws, q)是发送工作信号,让工作队列进行运转。
public ForkJoinTask<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; } final void externalPush(ForkJoinTask<?> task) { WorkQueue[] ws; WorkQueue q; int m; int r = ThreadLocalRandom.getProbe(); int rs = runState; if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 && (q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a; int am, n, s; if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) { int j = ((am & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); U.putOrderedInt(q, QLOCK, 0); if (n <= 1) signalWork(ws, q); return; } U.compareAndSwapInt(q, QLOCK, 1, 0); } externalSubmit(task); } private void externalSubmit(ForkJoinTask<?> task) { int r; // initialize callers probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue[] ws; WorkQueue q; int rs, m, k; boolean move = false; if ((rs = runState) < 0) { tryTerminate(false, false); // help terminate throw new RejectedExecutionException(); } else if ((rs & STARTED) == 0 // initialize ((ws = workQueues) == null (m = ws.length - 1) < 0)) { int ns = 0; rs = lockRunState(); try { if ((rs & STARTED) == 0) { U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong()); // create workQueues array with size a power of two int p = config & SMASK; // ensure at least 2 slots int n = (p > 1) ? p - 1 : 1; n = n >>> 1; n = n >>> 2; n = n >>> 4; n = n >>> 8; n = n >>> 16; n = (n + 1) << 1; workQueues = new WorkQueue[n]; ns = STARTED; } } finally { unlockRunState(rs, (rs & ~RSLOCK) ns); } } else if ((q = ws[k = r & m & SQMASK]) != null) { if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) { ForkJoinTask<?>[] a = q.array; int s = q.top; boolean submitted = false; // initial submission or resizing try { // locked version of push if ((a != null && a.length > s + 1 - q.base) (a = q.growArray()) != null) { int j = (((a.length - 1) & s) << ASHIFT) + ABASE; U.putOrderedObject(a, j, task); U.putOrderedInt(q, QTOP, s + 1); submitted = true; } } finally { U.compareAndSwapInt(q, QLOCK, 1, 0); } if (submitted) { signalWork(ws, q); return; } } move = true; // move on failure } else if (((rs = runState) & RSLOCK) == 0) { // create new queue q = new WorkQueue(this, null); q.hint = r; q.config = k SHARED_QUEUE; q.scanState = INACTIVE; rs = lockRunState(); // publish index if (rs > 0 && (ws = workQueues) != null && k < ws.length && ws[k] == null) ws[k] = q; // else terminated unlockRunState(rs, rs & ~RSLOCK); } else move = true; // move if busy if (move) r = ThreadLocalRandom.advanceProbe(r); } }
创建工人(线程)
提交任务后,通过signalWork
(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。
final void signalWork(WorkQueue[] ws, WorkQueue q) { long c; int sp, i; WorkQueue v; Thread p; while ((c = ctl) < 0L) { // too few active if ((sp = (int)c) == 0) { // no idle workers if ((c & ADD_WORKER) != 0L) // too few workers tryAddWorker(c); break; } if (ws == null) // unstarted/terminated break; if (ws.length <= (i = sp & SMASK)) // terminated break; if ((v = ws[i]) == null) // terminating break; int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState int d = sp - v.scanState; // screen CAS long nc = (UC_MASK & (c + AC_UNIT)) (SP_MASK & v.stackPred); if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) { v.scanState = vs; // activate v if ((p = v.parker) != null) U.unpark(p); break; } if (q != null && q.base == q.top) // no more work break; } } private void tryAddWorker(long c) { boolean add = false; do { long nc = ((AC_MASK & (c + AC_UNIT)) (TC_MASK & (c + TC_UNIT))); if (ctl == c) { int rs, stop; // check if terminating if ((stop = (rs = lockRunState()) & STOP) == 0) add = U.compareAndSwapLong(this, CTL, c, nc); unlockRunState(rs, rs & ~RSLOCK); if (stop != 0) break; if (add) { createWorker(); break; } } } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0); } private boolean createWorker() { ForkJoinWorkerThreadFactory fac = factory; Throwable ex = null; ForkJoinWorkerThread wt = null; try { if (fac != null && (wt = fac.newThread(this)) != null) { wt.start(); return true; } } catch (Throwable rex) { ex = rex; } deregisterWorker(wt, ex); return false; } final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) { WorkQueue w = null; if (wt != null && (w = wt.workQueue) != null) { WorkQueue[] ws; // remove index from array int idx = w.config & SMASK; int rs = lockRunState(); if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w) ws[idx] = null; unlockRunState(rs, rs & ~RSLOCK); } long c; // decrement counts do {} while (!U.compareAndSwapLong (this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) (TC_MASK & (c - TC_UNIT)) (SP_MASK & c)))); if (w != null) { w.qlock = -1; // ensure set w.transferStealCount(this); w.cancelAll(); // cancel remaining tasks } for (;;) { // possibly replace WorkQueue[] ws; int m, sp; if (tryTerminate(false, false) w == null w.array == null (runState & STOP) != 0 (ws = workQueues) == null (m = ws.length - 1) < 0) // already terminating break; if ((sp = (int)(c = ctl)) != 0) { // wake up replacement if (tryRelease(c, ws[sp & m], AC_UNIT)) break; } else if (ex != null && (c & ADD_WORKER) != 0L) { tryAddWorker(c); // create replacement break; } else // dont need replacement break; } if (ex == null) // help clean on way out ForkJoinTask.helpExpungeStaleExceptions(); else // rethrow ForkJoinTask.rethrow(ex); } public static interface ForkJoinWorkerThreadFactory { public ForkJoinWorkerThread newThread(ForkJoinPool pool); } static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } protected ForkJoinWorkerThread(ForkJoinPool pool) { // Use a placeholder until a useful name can be set in registerWorker super("aForkJoinWorkerThread"); this.pool = pool; this.workQueue = pool.registerWorker(this); } final WorkQueue registerWorker(ForkJoinWorkerThread wt) { UncaughtExceptionHandler handler; wt.setDaemon(true); // configure thread if ((handler = ueh) != null) wt.setUncaughtExceptionHandler(handler); WorkQueue w = new WorkQueue(this, wt); int i = 0; // assign a pool index int mode = config & MODE_MASK; int rs = lockRunState(); try { WorkQueue[] ws; int n; // skip if no array if ((ws = workQueues) != null && (n = ws.length) > 0) { int s = indexSeed += SEED_INCREMENT; // unlikely to collide int m = n - 1; i = ((s << 1) 1) & m; // odd-numbered indices if (ws[i] != null) { // collision int probes = 0; // step by approx half n int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2; while (ws[i = (i + step) & m] != null) { if (++probes >= n) { workQueues = ws = Arrays.copyOf(ws, n <<= 1); m = n - 1; probes = 0; } } } w.hint = s; // use as random seed w.config = i mode; w.scanState = i; // publication fence ws[i] = w; } } finally { unlockRunState(rs, rs & ~RSLOCK); } wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1))); return w; }
例:ForkJoinTask实现归并排序
这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。
package com.zhj.interview;import java.util.*;import java.util.concurrent.ExecutionException;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class Test16 { public static void main(String[] args) throws ExecutionException, InterruptedException { int[] bigArr = new int[10000000]; for (int i = 0; i < 10000000; i++) { bigArr[i] = (int) (Math.random() * 10000000); } ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(bigArr); long start = System.currentTimeMillis(); forkJoinPool.submit(task).get(); long end = System.currentTimeMillis(); System.out.println("耗时:" + (end-start));}}class MyForkJoinTask extends RecursiveTask<int[]> { private int source[]; public MyForkJoinTask(int source[]) { if (source == null) { throw new RuntimeException("参数有误!!!"); } this.source = source; } @Override protected int[] compute() { int l = source.length; if (l < 2) { return Arrays.copyOf(source, l); } if (l == 2) { if (source[0] > source[1]) { int[] tar = new int[2]; tar[0] = source[1]; tar[1] = source[0]; return tar; } else { return Arrays.copyOf(source, l); } } if (l > 2) { int mid = l / 2; MyForkJoinTask task1 = new MyForkJoinTask(Arrays.copyOf(source, mid)); task1.fork(); MyForkJoinTask task2 = new MyForkJoinTask(Arrays.copyOfRange(source, mid, l)); task2.fork(); int[] res1 = task1.join(); int[] res2 = task2.join(); int tar[] = merge(res1, res2); return tar; } return null; }// 合并数组 private int[] merge(int[] res1, int[] res2) { int l1 = res1.length; int l2 = res2.length; int l = l1 + l2; int tar[] = new int[l]; for (int i = 0, i1 = 0, i2 = 0; i < l; i++) { int v1 = i1 >= l1 ? Integer.MAX_VALUE : res1[i1]; int v2 = i2 >= l2 ? Integer.MAX_VALUE : res2[i2]; // 如果条件成立,说明应该取数组array1中的值 if(v1 < v2) { tar[i] = v1; i1++; } else { tar[i] = v2; i2++; } } return tar; }}
ForkJoin计算流程
通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。
到此这篇关于一文带你了解Java中的ForkJoin的文章就介绍到这了,更多相关Java中的ForkJoin内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。