`
378629846
  • 浏览: 213076 次
  • 性别: Icon_minigender_1
  • 来自: 哈尔滨
社区版块
存档分类
最新评论

并行计算框架的Java实现--系列二

    博客分类:
  • java
 
阅读更多

接上篇并行计算框架的Java实现--系列一

增加对结果的处理:

1、修改Job,实现Callable接口

public abstract class Job implements Callable<Object> {

	@Override
	public Object call() throws Exception {
		Object result = this.execute();//执行子类具体任务
		synchronized (Executer.LOCK) {
			//处理完业务后,任务结束,递减线程数,同时唤醒主线程
			Executer.THREAD_COUNT--;
			Executer.LOCK.notifyAll();
		}
		return result;
	}
	/**
	 * 业务处理函数
	 */
	public abstract Object execute();

}

 

2、修改Executer,增加对结果的处理

public class Executer {
	//计算已经派发的任务数(条件谓词)
	public static int THREAD_COUNT = 0;
	//存储任务的执行结果
	private List<Future<Object>> futres = new ArrayList<Future<Object>>(); 
	//条件队列锁
	public static final Object LOCK = new Object();
	//线程池
	private ExecutorService pool = null;
	public Executer() {
		this(1);
	}
	public Executer(int threadPoolSize) {
		pool = Executors.newFixedThreadPool(threadPoolSize);
	}
	/**
	 * 任务派发
	 * @param job
	 */
	public void fork(Job job){
		//将任务派发给线程池去执行
		futres.add(pool.submit(job));
		//增加线程数
		synchronized (LOCK) {
			THREAD_COUNT++;
		}
	}
	/**
	 * 统计任务结果
	 */
	public List<Object> join(){
		synchronized (LOCK) {
			while(THREAD_COUNT > 0){//检查线程数,如果为0,则表示所有任务处理完成
				System.out.println("threadCount: "+THREAD_COUNT);
				try {
					LOCK.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		List<Object> list = new ArrayList<Object>();
		//取出每个任务的处理结果,汇总后返回
		for (Future<Object> future : futres) {
			try {
				Object result = future.get();//因为任务都已经完成,这里直接get
				list.add(result);
			} catch (Exception e) {
				e.printStackTrace();
			} 
		}
		return list;
	}
}

 

 3、测试:

public static void main(String[] args) {
		//初始化任务池
		Executer exe = new Executer(5);
		//初始化任务
		long time = System.currentTimeMillis();
		for (int i = 0; i < 10; i++) {
			MyJob job = new MyJob();
			exe.fork(job);//派发任务
		}
		//汇总任务结果
		List<Object> list = exe.join();
		System.out.println("Result: "+list);
		System.out.println("time: "+(System.currentTimeMillis() - time));
	}

 

4、执行结果:

threadCount: 10
running thread id = 9
running thread id = 11
running thread id = 8
running thread id = 10
running thread id = 12
threadCount: 5
running thread id = 9
running thread id = 8
running thread id = 11
running thread id = 12
running thread id = 10
Result: [8, 9, 10, 11, 12, 8, 11, 12, 9, 10]
time: 2000

 

5、附件是完整代码

  • src.rar (2.2 KB)
  • 下载次数: 82
0
2
分享到:
评论
1 楼 lishuoying 2014-07-01  
建议:
1、pool 用完最好shutdown
2、join设计成接口回调更好

相关推荐

Global site tag (gtag.js) - Google Analytics