- 浏览: 213037 次
- 性别:
- 来自: 哈尔滨
文章分类
最新评论
-
lizhenzhendebishe:
提示An error was discovered proce ...
WebService:Axis客户端调用需要身份验证的CXF服务 -
yuanliangding:
学习了。不太接触底层的东西
UNIX系统的IO模型 -
_copythat:
加油,。。。
阳光总会在风雨之后洒向苍茫 -
donlianli:
莫非去淘宝菜鸟网络了?
阳光总会在风雨之后洒向苍茫 -
菜鸟小于:
我也是哈尔滨的,在广州做了三年的开发,可是实际上我们是在维护一 ...
阳光总会在风雨之后洒向苍茫
最近的工作需要统计一些复杂的报表,为了提高效率,想用多线程去实现,但要在所有线程完成统计任务后,将结果汇总。所以在思考有没有什么办法解决,之所以是“系列一”是因为我想记录下我的思考过程。
1、首先设计一个Executer,负责任务的执行和汇总:
public class Executer { //计算已经派发的任务数(条件谓词) public static int THREAD_COUNT = 0; //线程池 private Executor pool = null; public Executer() { this(1); } public Executer(int threadPoolSize) { pool = Executors.newFixedThreadPool(threadPoolSize); } /** * 任务派发 * @param job */ public void fork(Job job){ //将任务派发给线程池去执行 pool.execute(job); THREAD_COUNT++; } /** * 统计任务结果 */ public void join(){ while(THREAD_COUNT > 0){ System.out.println("threadCount: "+THREAD_COUNT); try { wait();//如果任务没有全部完成,则挂起 } catch (Exception e) {}//这里总是抛异常,不知道为什么,好吧!先不管它 } } }
2、写一个抽象的Job类,负责执行具体的任务
public abstract class Job implements Runnable { @Override public void run() { this.execute();//执行子类具体任务 Executer.THREAD_COUNT--; try{ notifyAll();//这里总是抛异常,不知道为什么,好吧!先不管它 }catch(Exception e){} } /** * 业务处理函数 */ public abstract void execute(); }
3、测试,先来一个具体的任务实现。
public class MyJob extends Job { @Override public void execute() { //模拟业务需要处理1秒. try {Thread.sleep(1000);} catch (InterruptedException e) {} System.out.println("running thread id = "+Thread.currentThread().getId()); } }
4、测试。
public class Test { public static void main(String[] args) { //初始化任务池 Executer exe = new Executer(5); //初始化任务 long time = System.currentTimeMillis(); for (int i = 0; i < 10; i++) { MyJob job = new MyJob(); exe.fork(job);//派发任务 } //汇总任务结果 exe.join(); System.out.println("time: "+(System.currentTimeMillis() - time)); } }
5、好吧,看一下结果
threadCount: 10 ......(表示有N多个) threadCount: 10 running thread id = 8 running thread id = 9 running thread id = 11 running thread id = 10 running thread id = 12 threadCount: 5 ......(表示有N多个) threadCount: 5 running thread id = 9 running thread id = 10 running thread id = 12 running thread id = 8 running thread id = 11 threadCount: 3 time: 2032
哈哈,看来是可以了,最后汇总任务的处理时间是2032毫秒,看来是比单个任务顺序执行来的快。但是有几个问题:
1)如果没有catch那个超级Exception的话,就会抛下面的异常:
java.lang.IllegalMonitorStateException at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:485) at com.one.Executer.join(Executer.java:38) at com.test.Test.main(Test.java:21)
2)为啥会打印N多个同样值threadCount呢?
于是和同事(河东)沟通,他说wait要放在synchronized里面才行,好吧,试一下,改进一下Executer和Job
public class Executer { //计算已经派发的任务数(条件谓词) public static int THREAD_COUNT = 0; //条件队列锁 public static final Object LOCK = new Object(); //线程池 private Executor pool = null; public Executer() { this(1); } public Executer(int threadPoolSize) { pool = Executors.newFixedThreadPool(threadPoolSize); } /** * 任务派发 * @param job */ public void fork(Job job){ //将任务派发给线程池去执行 pool.execute(job); //增加线程数 synchronized (LOCK) { THREAD_COUNT++; } } /** * 统计任务结果 */ public void join(){ synchronized (LOCK) { while(THREAD_COUNT > 0){ System.out.println("threadCount: "+THREAD_COUNT); try { LOCK.wait();//如果任务没有全部完成,则挂起 } catch (InterruptedException e) { e.printStackTrace(); } } } } }
public abstract class Job implements Runnable { @Override public void run() { this.execute();//执行子类具体任务 synchronized (Executer.LOCK) { //处理完业务后,任务结束,递减线程数,同时唤醒主线程 Executer.THREAD_COUNT--; Executer.LOCK.notifyAll(); } } /** * 业务处理函数 */ public abstract void execute(); }
6、测试一下:
threadCount: 10 running thread id = 8 running thread id = 11 running thread id = 9 threadCount: 7 running thread id = 10 threadCount: 6 running thread id = 12 threadCount: 5 running thread id = 11 running thread id = 12 running thread id = 10 threadCount: 2 running thread id = 9 running thread id = 8 threadCount: 1 time: 2016
还真的行,谢谢河东哈!
但是原因是什么呢?回去查了查书《Java并发编程实践》,见附件!
第14.2.1节这样说: 在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。 ... 由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。 14.2.4节: 由于在调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程应该尽快地释放,从而确保正在等待的线程尽可能尽快的解除阻塞。
看来之前是不会用wait和notify,哈哈~!
感谢河东,和你交流收获很大!
顺便测试一下java多线程情况下,多核CPU的利用率,修改上面的线程池大小和任务数(2个线程处理1000000个任务,去掉MyJob的sleep(这样可以多抢些CPU时间),结果如下:
看来window下是可以利用多核的,虽然是一个JVM进程。之前和斯亮讨论的结论是错误的。
- JAVA并发编程实践.part1.rar (8.6 MB)
- 下载次数: 754
- JAVA并发编程实践.part2.rar (8.6 MB)
- 下载次数: 668
- JAVA并发编程实践.part3.rar (8.6 MB)
- 下载次数: 352
- JAVA并发编程实践.part5.rar (2 MB)
- 下载次数: 87
- JAVA并发编程实践.part4.rar (8.6 MB)
- 下载次数: 508
评论
3 楼
378629846
2014-04-18
xinke0802 写道
将多线程的结果写入同一个文件的时候,遇到这样的问题~
java.io.IOException: Stream closed;
不知道楼主遇到没有啊~~
java.io.IOException: Stream closed;
不知道楼主遇到没有啊~~
多线程写入同一个文件,要加锁的。可以贴代码上来看看么?
2 楼
xinke0802
2014-03-21
将多线程的结果写入同一个文件的时候,遇到这样的问题~
java.io.IOException: Stream closed;
不知道楼主遇到没有啊~~
java.io.IOException: Stream closed;
不知道楼主遇到没有啊~~
1 楼
Ramls
2013-02-28
为什么执行完毕后,程序还是挂起的呢?
发表评论
-
使用zookeeper实现分布式共享锁
2013-04-13 15:59 2562分布式系统中经常需要协调多进程,多个jvm,或者多台机器之间 ... -
WebService:Axis客户端调用需要身份验证的CXF服务
2012-11-24 12:05 7028CXF服务端代码: 1、web.xml配置 < ... -
使用axis轻松调用Webservice
2012-11-10 13:15 21861使用axis1.4调用webservice有两种简单的方式: ... -
执行java程序时如何引用依赖的jar
2012-11-10 12:38 1753在执行java程序时我们可以通过-Djava.ext.dirs ... -
Log4j简单实用配置
2012-10-27 12:22 3633#A1为控制台输出,A2为文件输出,R为文件输出,并且按 ... -
JavaNIO处理长连接
2012-09-12 21:04 14048之前在IBM的网站上看到过一篇介绍NIO的文章,收获很大。但文 ... -
Int和byte数组之间的转换
2012-08-27 20:25 18751有时候和C的程序通信的时候,我们在封装协议时,可能需要将Jav ... -
用闭锁测试HashMap的并发写入问题
2012-08-27 19:39 3622今天无意中看到以前写 ... -
LDAP查询分页,基于迭代器的查询分页
2012-08-25 17:26 8447LDAP服务器端可以支持分页查询,但是有个前提条件 ... -
并行计算框架的Java实现--系列三
2012-07-14 14:38 4107接上篇并行计算框架的Java实现--系列二 优化锁,之 ... -
并行计算框架的Java实现--系列二
2012-07-14 08:41 2893接上篇并行计算框架的J ... -
ant初探
2012-06-29 13:38 2866前些天和同事交流,他说ant非常好用,他一直在用,学习资料共享 ... -
基于事件的 NIO 多线程服务器
2012-06-27 17:06 1204JDK1.4 的 NIO 有效解决了原有流式 IO 存在的线程 ... -
Java NIO学习
2012-06-22 22:46 0端午节加班要开发一个SocketServer,需要承载1000 ... -
httpclient访问https服务,可以信任证书
2012-06-02 00:48 6805private HttpClient initHttpClie ... -
Java或Web工程中查找配置文件
2012-06-02 00:35 1030String path = ""; URL ... -
如何获取真实的终端IP
2011-11-24 11:16 1186在有Apache做负载均衡的时候使用request.getRe ... -
MANIFEST.MF的应用以及如何读取jar包外的log4j.properties
2011-11-22 18:21 34MANIFEST.MF是jar文件的配置文件,在用eclips ... -
java实现的telnet协议
2011-10-02 17:14 7340package telnet; import j ... -
很好用的java反编译工具
2011-10-02 16:36 1206可以反编译单个文件,也可以反编译整个jar
相关推荐
NULL 博文链接:https://shenxueliang.iteye.com/blog/1592577
采用java8的并行计算,完成1到400亿的数列求和,并行计算完成时间3秒左右,采用普通计算完成时间13秒左右,效果明显
在大数据环境下,针对数据型统计分析系统性能劣化明显、不能满足用户使用需求的问题,本文提出了一种轻量级高性能对象化并行计算架构,研制了该架构的对象服务组件、对象管理服务组件和客户端代理组件,并将该架构和...
算法中的并行使用java的Fork / Join框架实现,他会将进程使用ForkJoinPool进行管理,并自动分配到空闲的CPU核心上来运算。由于个人PC的CPU核心数量较少,所以预期至多能产生常数倍的加速 效果。 本次实验使用的实验...
}}Disruptor costTime = 7458ms并发编程框架核心讲解2.1 Disruptor-QuickStart-基础元素工厂类Disruptor
MPJ并行编程框架的实现及安装配置 mpi mpj 并行计算 大规模计算, mpj是mpi 的java实现本文里面提供了windows和linux变成范例,帮助你进入并行计算的大门。
分布式并行计算框架PARALL 应用场景:解决实时统计一些复杂的报表,统计的数据量大,而且数据来源多个不同的表结构,最终界面展示超长时间。 解决方案:为了提高效率,用多线程按列去统计,在所有线程完成统计任务...
学习资料:淘宝分布式并行计算四合一框架Fourinone 内容:淘宝分布式并行计算四合一框架Fourinone详细介绍;源码分析及使用说明;Fourinone分布式计算框架性能、压力、容灾测试报告;框架压缩包 本示例把分配任务的...
网格计算框架JPPF,Java Parallel Processing Framework,即java并行处理框架.一个开放源码的网格计算框架,它可以在一个分布执行环境中 同时运行多个java应用.这是他的最新版1.5版
tinympi4j 是一款微型的 java 分布式离线计算框架, 实现原理如图: 特性 简单直观, 没有任何学习难度 slave支持多个任务并发/并行执行 使用HTTP协议通信 场景:
主要介绍了浅谈Java Fork/Join并行框架,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
本文设计实现了一个能独立运行的并行程序编辑器。首先在需求分析的基础上,提出了并行程序编辑器的总体框架。然后对高亮显示这一核心问题进行了深入剖析,并对文件基本操作、基本编辑功能、关键字查找与替换、格式...
【项目资源】: 包含前端、后端、移动开发、操作系统、人工智能、物联网、...# 注意 1. 本资源仅用于开源学习和技术交流。不可商用等,一切后果由使用者承担。 2. 部分字体以及插图等来自网络,若是侵权请联系删除。
第1章主要介绍了并行计算中相关的 些基本概念, 树立读者对并行计算的基本认识;介绍了两个重要的并行性能评估定律, 以及 Java 内存模型 JMM。第2章介绍了 Java 并行程序开发的基础, 包括 Java 中 Thread 的基本...
C++并行计算与异步网络框架。搜狗公司C++服务器引擎,编程范式。支撑搜狗几乎所有后端C++在线服务,包括所有搜索服务,云输入法,在线广告等,每日处理数百亿请求。这是一个设计轻盈优雅的企业级程序引擎,可以满足...
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是...
akka java源码并行与并发 包含各种并行和多线程概念的源代码,包括: 线程数 固有锁(监控器) 等待,通知 执行者 可调用项和将来的任务 并发集合 原子引用和整数 Akka框架
主题建模 该项目在使用Java和C的Hadoop MapReduce和MPI等并行计算框架上,使用潜在Dirichlet分配(LDA)算法评估主题建模的性能。
Java高并发相关知识点包括: 线程:Java多线程的实现方式,包括继承...并行计算:Java中的并行计算,包括Fork/Join框架、并行流等。 线程间通信:Java中的线程间通信,包括wait()、notify()、notifyAll()等方法。
� Android 更像一款桌面环境为 Java 的 Linux 操作系统。有助于 Google 实现其 " 随时随地为每个人提供信 息 " 的企业战略。 HTC HTC HTC HTC Dream/G1 Dream/G1 Dream/G1 Dream/G1 具体配置 硬件 3.17 英寸 HVGA ...