PoweJob高级特性()

  本篇文章为你整理了PoweJob高级特性()的详细内容,包含有 PoweJob高级特性,希望能帮助你了解 PoweJob高级特性。

  由于网上搜索 PowerJob MapReduce 都是设计原理,demo也展示个空壳子,没有演示Map到Reduce结果怎么传递,对于没有MR开发经验的人来说并没有什么帮助,所以这里写了一个有完整计算意义的demo供参考。

  代码功能:

  实现一个sum累加。

  任务输入参数:

  batchSize=100 batchNum=10,
 

  其中batchSize表示每个子任务大小,这里就是一个子任务负责100个数据累加。
 

  batchNum表示批次大小,也就是本次分发为10个子任务来完成。
 

  执行过程就是:Map过程是将本次任务划分为10个子任务,每个子任务分别完成1累加到100,101累加到201,...,以此类推。Reduce过程获取每个子任务的执行结果汇总累加,返回结果值。

  

package org.example.demo;

 

  import com.google.common.base.Splitter;

  import com.google.common.collect.Lists;

  import org.apache.commons.lang3.math.NumberUtils;

  import org.springframework.stereotype.Component;

  import tech.powerjob.common.serialize.JsonUtils;

  import tech.powerjob.worker.core.processor.ProcessResult;

  import tech.powerjob.worker.core.processor.TaskContext;

  import tech.powerjob.worker.core.processor.TaskResult;

  import tech.powerjob.worker.core.processor.sdk.MapReduceProcessor;

  import tech.powerjob.worker.log.OmsLogger;

  import java.io.Serializable;

  import java.util.List;

  import java.util.Map;

   * 控制台参数 batchSize=100 batchNum=10

   * @author zhengqian

   * @date 2022.05.30

  @Component

  public class MRSumProcessor implements MapReduceProcessor {

   @Override

   public ProcessResult process(TaskContext context) throws Exception {

   OmsLogger omsLogger = context.getOmsLogger();

   System.out.println("============== TestMapReduceProcessor#process ==============");

   System.out.println("isRootTask:" + isRootTask());

   System.out.println("taskContext:" + JsonUtils.toJSONString(context));

   if (isRootTask()) {

   System.out.println("==== MAP ====");

   omsLogger.info("[DemoMRProcessor] start root task~");

   // 根据控制台参数获取MR批次及子任务大小

   Map String, String jobParams = Splitter.on(" ").withKeyValueSeparator("=").split(context.getJobParams());

   Integer batchSize = Integer.parseInt(jobParams.getOrDefault("batchSize", "100"));

   Integer batchNum = Integer.parseInt(jobParams.getOrDefault("batchNum", "10"));

   List SubTaskParam subTasks = Lists.newLinkedList();

   for (int j = 0; j batchNum; j++) {

   subTasks.add(new SubTaskParam(j * batchSize + 1, (j + 1) * batchSize));

   map(subTasks, "INFO");

   subTasks.clear();

   omsLogger.info("[DemoMRProcessor] map success~");

   return new ProcessResult(true, "MAP_SUCCESS");

   } else if (context.getTaskName().equals("INFO")) {

   // 子任务执行

   SubTaskParam subTaskParam = (SubTaskParam) context.getSubTask();

   omsLogger.info(subTaskParam.toString());

   long sum = 0L;

   for (int x = subTaskParam.getStart(); x = subTaskParam.getEnd(); x++) {

   sum += x;

   omsLogger.info("[DemoMRProcessor] start={}, end={}, sum={}", subTaskParam.getStart(), subTaskParam.getEnd(), sum);

   return new ProcessResult(true, String.valueOf(sum));

   return new ProcessResult(false);

   @Override

   public ProcessResult reduce(TaskContext context, List TaskResult taskResults) {

   log.info("================ MapReduceProcessorDemo#reduce ================");

   log.info("TaskContext: {}", JsonUtils.toJSONString(context));

   log.info("List TaskResult : {}", JsonUtils.toJSONString(taskResults));

   context.getOmsLogger().info("MapReduce job finished, result is {}.", taskResults);

   long sum = 0L;

   for (TaskResult taskResult : taskResults) {

   String result = taskResult.getResult();

   if (NumberUtils.isDigits(result)) {

   sum += Long.parseLong(result);

   return new ProcessResult(true, sum + ": " + sum);

   private static class SubTaskParam implements Serializable {

   private int start;

   private int end;

   public SubTaskParam() {}

   public SubTaskParam(int start, int end) {

   this.start = start;

   this.end = end;

   public int getStart() {

   return start;

   public void setStart(int start) {

   this.start = start;

   public int getEnd() {

   return end;

   public void setEnd(int end) {

   this.end = end;

   @Override

   public String toString() {

   return start + ":" + end;

  以上就是PoweJob高级特性()的详细内容,想要了解更多 PoweJob高级特性的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: