接上篇并行计算框架的Java实现--系列一 。
增加对结果的处理:
1、修改Job,实现Callable接口
public abstract class Job implements Callable<Object> {
@Override
public Object call() throws Exception {
Object result = this.execute();//执行子类具体任务
synchronized (Executer.LOCK) {
//处理完业务后,任务结束,递减线程数,同时唤醒主线程
Executer.THREAD_COUNT--;
Executer.LOCK.notifyAll();
}
return result;
}
/**
* 业务处理函数
*/
public abstract Object execute();
}
2、修改Executer,增加对结果的处理
public class Executer {
//计算已经派发的任务数(条件谓词)
public static int THREAD_COUNT = 0;
//存储任务的执行结果
private List<Future<Object>> futres = new ArrayList<Future<Object>>();
//条件队列锁
public static final Object LOCK = new Object();
//线程池
private ExecutorService pool = null;
public Executer() {
this(1);
}
public Executer(int threadPoolSize) {
pool = Executors.newFixedThreadPool(threadPoolSize);
}
/**
* 任务派发
* @param job
*/
public void fork(Job job){
//将任务派发给线程池去执行
futres.add(pool.submit(job));
//增加线程数
synchronized (LOCK) {
THREAD_COUNT++;
}
}
/**
* 统计任务结果
*/
public List<Object> join(){
synchronized (LOCK) {
while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成
System.out.println("threadCount: "+THREAD_COUNT);
try {
LOCK.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
List<Object> list = new ArrayList<Object>();
//取出每个任务的处理结果,汇总后返回
for (Future<Object> future : futres) {
try {
Object result = future.get();//因为任务都已经完成,这里直接get
list.add(result);
} catch (Exception e) {
e.printStackTrace();
}
}
return list;
}
}
3、测试:
public static void main(String[] args) {
//初始化任务池
Executer exe = new Executer(5);
//初始化任务
long time = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
MyJob job = new MyJob();
exe.fork(job);//派发任务
}
//汇总任务结果
List<Object> list = exe.join();
System.out.println("Result: "+list);
System.out.println("time: "+(System.currentTimeMillis() - time));
}
4、执行结果:
threadCount: 10
running thread id = 9
running thread id = 11
running thread id = 8
running thread id = 10
running thread id = 12
threadCount: 5
running thread id = 9
running thread id = 8
running thread id = 11
running thread id = 12
running thread id = 10
Result: [8, 9, 10, 11, 12, 8, 11, 12, 9, 10]
time: 2000
5、附件是完整代码
分享到:
相关推荐
采用java8的并行计算,完成1到400亿的数列求和,并行计算完成时间3秒左右,采用普通计算完成时间13秒左右,效果明显
在大数据环境下,针对数据型统计分析系统性能劣化明显、不能满足用户使用需求的问题,本文提出了一种轻量级高性能对象化并行计算架构,研制了该架构的对象服务组件、对象管理服务组件和客户端代理组件,并将该架构和...
算法中的并行使用java的Fork / Join框架实现,他会将进程使用ForkJoinPool进行管理,并自动分配到空闲的CPU核心上来运算。由于个人PC的CPU核心数量较少,所以预期至多能产生常数倍的加速 效果。 本次实验使用的实验...
MPJ并行编程框架的实现及安装配置 mpi mpj 并行计算 大规模计算, mpj是mpi 的java实现本文里面提供了windows和linux变成范例,帮助你进入并行计算的大门。
}}Disruptor costTime = 7458ms并发编程框架核心讲解2.1 Disruptor-QuickStart-基础元素工厂类Disruptor
分布式并行计算框架PARALL 应用场景:解决实时统计一些复杂的报表,统计的数据量大,而且数据来源多个不同的表结构,最终界面展示超长时间。 解决方案:为了提高效率,用多线程按列去统计,在所有线程完成统计任务...
学习资料:淘宝分布式并行计算四合一框架Fourinone 内容:淘宝分布式并行计算四合一框架Fourinone详细介绍;源码分析及使用说明;Fourinone分布式计算框架性能、压力、容灾测试报告;框架压缩包 本示例把分配任务的...
网格计算框架JPPF,Java Parallel Processing Framework,即java并行处理框架.一个开放源码的网格计算框架,它可以在一个分布执行环境中 同时运行多个java应用.这是他的最新版1.5版
tinympi4j 是一款微型的 java 分布式离线计算框架, 实现原理如图: 特性 简单直观, 没有任何学习难度 slave支持多个任务并发/并行执行 使用HTTP协议通信 场景:
主要介绍了浅谈Java Fork/Join并行框架,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
并行编程在高性能计算领域的应用越来越广泛。国家863计划项目“网格服务环境结点建设及其支撑技术研究”的子课题 “用户开发环境研究”,旨在开发出以客户端/服务器模式运行的,能在远程编辑、编译、运行、调试并行...
对于有一定基础或热衷于研究的人来说,可以在这些基础代码上进行修改和扩展,实现其他功能。 【沟通交流】: 有任何使用上的问题,欢迎随时与博主沟通,博主会及时解答。 鼓励下载和使用,并欢迎大家互相学习,共同...
第2章介绍了 Java 并行程序开发的基础, 包括 Java 中 Thread 的基本使用方法等第3章介绍了 JDK 内部对并行程序开发的支持, 主要介绍 JUC (Java.util.concurrent) 中些工具的使用方法、 各自特点及它们的内部实现...
C++并行计算与异步网络框架。搜狗公司C++服务器引擎,编程范式。支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理数百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足...
akka java源码并行与并发 包含各种并行和多线程概念的源代码,包括: 线程数 固有锁(监控器) 等待,通知 执行者 可调用项和将来的任务 并发集合 原子引用和整数 Akka框架
主题建模 该项目在使用Java和C的Hadoop MapReduce和MPI等并行计算框架上,使用潜在Dirichlet分配(LDA)算法评估主题建模的性能。
Java高并发相关知识点包括: 线程:Java多线程的实现方式,包括继承...并行计算:Java中的并行计算,包括Fork/Join框架、并行流等。 线程间通信:Java中的线程间通信,包括wait()、notify()、notifyAll()等方法。
2 开放手机联盟 --Open --Open --Open --Open Handset Handset Handset Handset Alliance Alliance Alliance Alliance 什么是开放手机联盟? 开放手机联盟, Open Handset Alliance :是美国 Google 公司与 2007 年 ...
本文来自于csdn,这篇文章讲解了分布式计算框架的核心内容、架构图详解,运用流程等Hadoop是Apache软件基金会所开发的并行计算框架与分布式文件系统。最核心的模块包括HadoopCommon、HDFS与MapReduce。HDFSHDFS是...