springboot 多线程池,java线程池阻塞队列
00-1010一.案例情景二。使用等级III。本实施例的描述1。正在接收web请求2。正在处理后台任务3。关系四的描述。1.代码:订单控制器2。flowstarter流程启动器3。流程经理流程经理4。线程池容器5。线程执行器6。StepHandler业务处理员7。阻塞队列7.1流队列7.2队列工具7.3常量工具8。任务模型8.1步骤模型8.2步骤结果9。业务数据模型9.1 OrderInfo9.2 ResultObj10。测试10.1 web请求10.2后台任务日志版本:Spring Boot 2.6.3
00-
目录
1web web端接收生成任务A的restful请求,并将任务放入queue _ A。
线程池A的任务线程从Queue_A中取出任务,处理后放入Queue_B。
线程池B的3个任务线程从Queue_B中取出任务,处理后入库。
该示例使用两个任务步骤来根据需要扩展任务链。
00-1010 java.util.linked hashmap,双链表。
Java . util . concurrent . blocking queue,阻塞队列接口。
Java。util。并发。LinkedBlockingQueue,阻塞队列实现类。
Java.util.concurrent.countdown锁存器,线程计数器。
Java . util . concurrent . locks . reentrant lock,可以重新锁定。
一、案例场景
二、使用类
OrderController接收web请求,将业务数据封装成task对象,写入queue _ a,Web请求结束,立即返回。
三、本例说明
流量启动器流程启动器
管理流程管理器、创建流程管理器和启动流程管理器。创建一个线程池容器StepContainer,并指定队列、线程池中的线程数和业务处理程序。
FlowManager流程经理
管理线程池容器步骤容器。创建线程池容器,启动线程池容器,关闭线程池容器,并在线程池容器之间传输数据。使用LinkedHashMap在一个进程中维护多个线程池容器。
StepContainer线程池容器
创建一个线程池,启动线程执行器,初始化业务处理程序,读写队列。使用LinkedHashMap在一个流程中维护多个StepExecutor。
线程执行器
执行抽象的公共业务逻辑。实现线程Runnable接口。调用StepHandler实现类的execute来执行具体的业务逻辑。
StepHandler商业处理器处理器
该业务是在StepHandler的实现类execute中实现的。
任务模型对象步骤模型和执行结果对象步骤结果
每一个具体的业务数据必须被封装成一个任务模型对象StepModel,而执行结果作为一个执行结果对象StepResult,这样它才能在线程池和队列中流动。
00-1010一个FlowStarter可以启动一个或多个FlowManager。支持一对多和一对一,按需扩展。
FlowManager对应于一个业务流程。业务流程可以分为多个步骤。一个步骤对应一个线程池容器StepContainer。线程池容器StepContainer启动多个线程执行器StepExecutor。效果是并发执行任务。
业务流程分为几个步骤,数据在每个步骤之间流动。通过使用任务模型StepModel中的状态标识isFinished、isPutInQueueAgain、isPutInQueueNext字段来分析任务流。使用StepModel的StepResult的nextStepName字段来标识流向的线程池容器。
1.接收web请求
2.后台任务处理
OrderController,接收请求,封装任务,写队列。
@ Slf4j @ rest controller @ request mapping(/order )公共类OrderController { @Po
stMapping("/f1") public Object f1(@RequestBody Object obj) { log.info("OrderController->f1,接收参数,obj = " + obj.toString()); Map objMap = (Map) obj; OrderInfo orderInfo = new OrderInfo(); orderInfo.setUserName((String) objMap.get("userName")); orderInfo.setTradeName((String) objMap.get("tradeName")); orderInfo.setOrderTime(System.currentTimeMillis()); LinkedBlockingQueue<StepModel> queueA = FlowQueue.getBlockingQueue("QUEUE_A"); QueueUtils.putStepPutInQueue(queueA,orderInfo); log.info("OrderController->f1,返回." ); return ResultObj.builder().code("200").message("成功").build(); }}
2.FlowStarter流程启动器
FlowStarter,后台任务线程池和线程启动。实现InitializingBean了接口。那么在spring初始化化bean完成后,就能触发启动线程池和线程。
@Slf4j@Servicepublic class FlowStarter implements InitializingBean { @Override public void afterPropertiesSet() throws Exception { log.info("FlowWorker创建流程."); FlowManager flowManager = new FlowManager(); flowManager.buildContainer(ConstantUtils.STEP_01,5, FlowQueue.getBlockingQueue("QUEUE_A"), Step01Handler.class ); flowManager.buildContainer(ConstantUtils.STEP_02,5, FlowQueue.getBlockingQueue("QUEUE_B"), Step02Handler.class ); flowManager.startContainers(); log.info("FlowWorker启动流程完成."); }}
3.FlowManager流程管理器
一个FlowManager流程管理器,维护多个线程池容器StepContainer,共同完成一个流程的多个步骤。
public class FlowManager { // 管理器名称 private String name; // 管理线程池容器 private Map<String, StepContainer> stepContainerMap = new LinkedHashMap<>(); public FlowManager() {} // 创建线程池容器 public void buildContainer(String name, int poolSize, BlockingQueue<StepModel> queue, Class<? extends StepHandler> handlerClazz) { StepContainer stepWorker = new StepContainer(); stepWorker.createThreadPool(poolSize, queue, handlerClazz); stepWorker.setName(name); stepWorker.setFlowManager(this); this.stepContainerMap.put(name, stepWorker); } // 启动线程池容器 public void startContainers() { for (StepContainer stepContainer : this.stepContainerMap.values()) { stepContainer.startRunExecutor(); } } // 关闭线程池容器 public void stopContainers() { for (StepContainer stepContainer : this.stepContainerMap.values()) { stepContainer.stopRunExecutor(); } this.stepContainerMap.clear(); } // 任务放入下一个线程池 public boolean sendToNextContainer(String nextStepName, Object obj) { if (nextStepName != null && !StringUtils.equals(nextStepName, "")) { if (this.stepContainerMap.containsKey(nextStepName)) { this.stepContainerMap.get(nextStepName).putStepInQueue(obj); return true; } else { return false; } } else { return false; } } public String getName() { return name; }}
4.StepContainer线程池容器
StepContainer线程池容器,维护多个线程执行器StepExecutor,实现多线程异步完成每个独立任务。
@Slf4jpublic class StepContainer { // 线程池名称 private String name; // 线程池 private ExecutorService threadPool; // 线程数目 private int nThreads = 0; // 线程处理业务handler类 private Class handlerClazz; // 线程处理业务队列 private BlockingQueue<StepModel> queue = null; // 线程池内线程管理 private Map<String, StepExecutor> stepExecutorMap = new LinkedHashMap<>(); // 线程池运行状态 private boolean isRun = false; // 线程池管理器 private FlowManager flowManager = null; // 构造函数 public StepContainer() {} // 创建线程池 public boolean createThreadPool(int nThreads, BlockingQueue<StepModel> queue, Class<? extends StepHandler> handlerClazz) { try { this.nThreads = nThreads; this.queue = queue; this.handlerClazz = handlerClazz; this.threadPool = Executors.newFixedThreadPool(this.nThreads, new ThreadFactory() { @Override public Thread newThread(Runnable runnable) { return new Thread(runnable); } }); } catch (Exception e) { e.printStackTrace(); return false; } return true; } // 启动线程 public void startRunExecutor() { if (!this.isRun) { if (this.handlerClazz != null) { log.info("线程池: " + this.name + ",启动,加载线程Executor."); StepExecutor stepExecutor; String executorName = ""; for (int num = 0; num < this.nThreads; num++) { try { executorName = this.name + "_" + (num + 1); StepHandler stepHandler = (StepHandler) createStepHandler(this.handlerClazz); stepExecutor = new StepExecutor(executorName, this.queue, stepHandler, this); this.threadPool.execute(stepExecutor); this.stepExecutorMap.put(executorName, stepExecutor); } catch (Exception e) { e.printStackTrace(); } } this.isRun = true; } } } // 关闭线程 public void stopRunExecutor() { if (isRun) { Iterator iterator = this.stepExecutorMap.values().iterator(); while (iterator.hasNext()) { StepExecutor stepExecutor = (StepExecutor) iterator.next(); stepExecutor.stop(); } this.stepExecutorMap.clear(); this.isRun = false; } } // 从队列获取任务 public StepModel getStepFromQueue() { StepModel stepModel = null; synchronized (this.queue) { try { if (this.queue.size() > 0) { stepModel = this.queue.take(); } } catch (Exception e) { log.info("从队列获取任务异常."); e.printStackTrace(); } } return stepModel; } // 任务放入队列 public void putStepInQueue(Object obj) { try { StepModel stepModel = new StepModel(obj); stepModel.setPutInQueueTime(System.currentTimeMillis()); this.queue.put(stepModel); } catch (InterruptedException e) { log.info("任务放入队列异常."); e.printStackTrace(); } } // 重新放入 public void putStepInQueueAgain(StepModel stepModel) { stepModel.setFinished(false); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); try { this.queue.put(stepModel); } catch (InterruptedException e) { log.info("任务重新放入队列异常."); e.printStackTrace(); } } // 清空队列 public void clearQueue() { if (this.queue != null) { this.queue.clear(); } } // 初始化实例对象 public Object createStepHandler(Class clazz) throws InstantiationException, IllegalAccessException { Object object = clazz.newInstance(); return object; } public String getName() { return name; } public void setName(String name) { this.name = name; } public FlowManager getFlowManager() { return flowManager; } public void setFlowManager(FlowManager flowManager) { this.flowManager = flowManager; }}
5.StepExecutor线程执行器
StepExecutor线程执行器,实现Runnable接口。线程执行单元通用逻辑,具体业务逻辑通过调用StepHandler的execute方法实现。
@Slf4jpublic class StepExecutor implements Runnable { // 执行器名称 private String name; // 线程执行的任务 private StepModel stepModel; // 线程执行的队列 private BlockingQueue<StepModel> queue; // 线程执行的业务处理逻辑 private Object stepHandler; // 线程运行状态 private volatile boolean isRun = false; // 线程开启(True)和关闭(False) private volatile boolean isClose = false; // 线程隶属容器 private StepContainer stepContainer; // 线程计数器(关闭线程使用) private CountDownLatch countDownLatch = null; public StepExecutor() {} public StepExecutor(String name, BlockingQueue<StepModel> queue, StepHandler stepHandler, StepContainer stepContainer) { this.name = name; this.queue = queue; this.stepHandler = stepHandler; this.stepContainer = stepContainer; } @Override public void run() { this.isRun = true; this.countDownLatch = new CountDownLatch(1); // 没收到关闭信号,则循环运行 while (!this.isClose) { this.stepModel = null; String threadName = "【线程池:" + this.stepContainer.getName() + ",线程:" + Thread.currentThread().getName() + "】"; // 循环运行,为防止中断和卡主,需捕获异常 try { StepHandler stepHandler = (StepHandler) this.stepHandler; this.stepModel = this.stepContainer.getStepFromQueue(); if (this.stepModel != null) { log.info(threadName + ",处理任务."); this.stepModel.getStepResultList().clear(); stepHandler.execute(this.stepModel); // 执行完成后结果数据 List<StepResult> stepResultList = this.stepModel.getStepResultList(); boolean isFinished = this.stepModel.isFinished(); boolean isPutInQueueAgain = this.stepModel.isPutInQueueAgain(); boolean isPutInQueueNext = this.stepModel.isPutInQueueNext(); if (isFinished && !isPutInQueueAgain && !isPutInQueueNext) { log.info(threadName + ",任务结束."); } if (!isFinished && isPutInQueueAgain && !isPutInQueueNext) { log.info(threadName + ",任务在本步骤未完成,重新放队列."); this.stepContainer.putStepInQueueAgain(this.stepModel); } if (!isFinished && !isPutInQueueAgain && isPutInQueueNext) { int resultNum = stepResultList.size(); if (resultNum > 0) { for (StepResult stepResult : stepResultList) { log.info(threadName + ",任务在本步骤已经完成,发送给下一个线程池: " + stepResult.getNextStepName() + ",执行."); this.stepContainer.getFlowManager().sendToNextContainer( stepResult.getNextStepName(), stepResult.getResult()); } } } } else { threadToSleep(1000 * 3L); } } catch (Exception e) { log.info("执行器异常."); e.printStackTrace(); this.stepContainer.putStepInQueueAgain(this.stepModel); } } // 跳出循环后,线程计数减1 this.countDownLatch.countDown(); this.isRun = false; } public void stop() { this.isClose = true; if (this.countDownLatch != null) { while (this.countDownLatch.getCount() > 0L) { try { this.countDownLatch.await(); } catch (InterruptedException e) { log.info("线程关闭异常."); e.printStackTrace(); } } } this.isClose = false; } public void threadToSleep(long time) { try { Thread.sleep(time); } catch (Exception e) { log.info("线程休眠异常."); e.printStackTrace(); } }}
6.StepHandler业务处理handler
StepHandler是StepExecutor线程执行器,具体执行业务逻辑的入口。
StepHandler抽象类
每个具体的实现类都继承抽象的StepHandler。
public abstract class StepHandler { public StepHandler() {} public abstract void execute(StepModel stepModel);}
Step01Handler
Step01Handler是StepHandler实现类,从队列中取任务执行,执行完成后放入下一个业务处理器Step02Handler。
@Slf4jpublic class Step01Handler extends StepHandler { @Override public void execute(StepModel stepModel) { log.info("Step01Handler执行开始,stepModel: " + stepModel.toString()); OrderInfo orderInfo = (OrderInfo) stepModel.getObj(); List<StepResult> stepResultList = stepModel.getStepResultList(); try { log.info("Step01Handler执行,处理订单."); String orderNo = UUID.randomUUID().toString() .replace("-", "").toUpperCase(); orderInfo.setOrderNo(orderNo); orderInfo.setPlatformType("线上"); orderInfo.setOrderSource("Web"); stepModel.setFinished(false); stepModel.setPutInQueueNext(true); stepModel.setPutInQueueAgain(false); stepResultList.add(new StepResult(ConstantUtils.STEP_02, orderInfo)); } catch (Exception e) { stepModel.setFinished(false); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(true); stepResultList.add(new StepResult(ConstantUtils.STEP_01, orderInfo)); } log.info("Step01Handler执行完成,stepModel: " + stepModel.toString()); }}
Step02Handler
Step02Handler是StepHandler实现类,从队列中取任务执行。
@Slf4jpublic class Step02Handler extends StepHandler{ @Override public void execute(StepModel stepModel) { log.info("Step02Handler执行开始,stepModel: " + stepModel.toString()); OrderInfo orderInfo = (OrderInfo) stepModel.getObj(); List<StepResult> stepResultList = stepModel.getStepResultList(); try { orderInfo.setEndTime(System.currentTimeMillis()); stepModel.setFinished(true); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); log.info("Step02Handler执行,入库."); } catch (Exception e) { stepModel.setFinished(true); stepModel.setPutInQueueNext(false); stepModel.setPutInQueueAgain(false); } log.info("Step02Handler执行完成,stepModel: " + stepModel.toString()); }}
7.阻塞队列
BlockingQueue是线程安全的阻塞队列。
7.1 FlowQueue
FlowQueue,管理本例使用的两个阻塞队列。
public class FlowQueue { private static final LinkedBlockingQueue<StepModel> queueA = new LinkedBlockingQueue<StepModel>(); private static final LinkedBlockingQueue<StepModel> queueB = new LinkedBlockingQueue<StepModel>(); public static LinkedBlockingQueue<StepModel> getBlockingQueue(String queueName) { LinkedBlockingQueue<StepModel> queue = null; switch (queueName) { case "QUEUE_A": queue = queueA; break; case "QUEUE_B": queue = queueB; break; } return queue; }}
7.2 QueueUtils
QueueUtils,队列简易工具。
@Slf4jpublic class QueueUtils { public static StepModel getStepFromQueue( LinkedBlockingQueue<StepModel> queue) { StepModel stepModel = null; try { if (queue.size() > 0) { stepModel = queue.take(); } } catch (Exception e) { log.info("读队列异常."); e.printStackTrace(); } return stepModel; } public static void putStepPutInQueue( LinkedBlockingQueue<StepModel> queue, Object obj) { try { StepModel stepModel = new StepModel(obj); stepModel.setPutInQueueTime(System.currentTimeMillis()); queue.put(stepModel); } catch (Exception e) { log.info("写队列异常."); e.printStackTrace(); } } public static int getQueueSize( LinkedBlockingQueue<StepModel> queue) { int size = 0; try { size = queue.size(); } catch (Exception e) { log.info("获取队列Size异常."); e.printStackTrace(); } return size; }}
7.3 ConstantUtils
ConstantUtils,管理常量,即线程池名称。
public class ConstantUtils { public static final String STEP_01 = "STEP_01_THREAD_POOL"; public static final String STEP_02 = "STEP_02_THREAD_POOL";}
8.任务模型
任务模型,即具体需要处理对象,封装成线程使用的任务模型,这样可以把业务和流程框架解耦。
8.1 StepModel
StepModel,任务模型封装。
@Datapublic class StepModel { // 任务对象 private Object obj; // 任务执行结果 private List<StepResult> stepResultList; // 任务接收时间 private long putInQueueTime; // 任务完成标识 private boolean isFinished = false; // 任务重新放入队列标识 private boolean isPutInQueueAgain = false; // 任务放入下一个队列标识 private boolean isPutInQueueNext = false; public StepModel(Object object) { this.obj = object; this.stepResultList = new ArrayList<>(); }}
8.2 StepResult
StepResult,执行结果模型封装。
@Datapublic class StepResult { // 目标线程池名 private String nextStepName; // 执行结果 private Object result; public StepResult(String nextStepName,Object result){ this.nextStepName = nextStepName; this.result = result; }}
9.业务数据模型
业务数据模型,即生成具体需要处理的数据,在传入给线程池的线程执行前,需要封装成任务模型。
9.1 OrderInfo
OrderInfo,本例要处理的业务数据模型。
@Data@NoArgsConstructorpublic class OrderInfo { private String userName; private String orderNo; private String tradeName; private String platformType; private String orderSource; private long orderTime; private long endTime;}
9.2 ResultObj
ResultObj,web请求返回的统一封装对象。
@Data@NoArgsConstructor@AllArgsConstructor@Builderpublic class ResultObj { private String code; private String message;}
10.测试
包括web请求和后台任务
10.1 web请求
请求URL: http://127.0.0.1:8080/server/order/f1
入参:
{ "userName": "HangZhou0614", "tradeName": "Vue进阶教程"}
返回值:
{ "code": "200", "message": "成功"}
10.2 后台任务日志
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。