`
378629846
  • 浏览: 213072 次
  • 性别: Icon_minigender_1
  • 来自: 哈尔滨
社区版块
存档分类
最新评论

并行计算框架的Java实现--系列三

    博客分类:
  • java
 
阅读更多

接上篇并行计算框架的Java实现--系列

 

优化锁,之前的锁是采用一个static的Object实现的,这样会有一个问题,如果我创建了多个Executer,那么所有Job都会持有一把锁,既影响性能,也容易出现死锁的情况。所以,改成每个Executer持有一把锁。

Executer代码如下:

 public class Executer {

	//存储任务的执行结果
	private List<Future<Object>> futres = new ArrayList<Future<Object>>(); 
	//条件队列锁,以及线程计数器
	public final Lock lock = new Lock();
	//线程池
	private ExecutorService pool = null;
	public Executer() {
		this(1);
	}
	public Executer(int threadPoolSize) {
		pool = Executors.newFixedThreadPool(threadPoolSize);
	}
	/**
	 * 任务派发
	 * @param job
	 */
	public void fork(Job job){
		//设置同步锁
		job.setLock(lock);
		//将任务派发给线程池去执行
		futres.add(pool.submit(job));
		//增加线程数
		synchronized (lock) {
			lock.thread_count++;
		}
	}
	/**
	 * 统计任务结果
	 */
	public List<Object> join(){
		synchronized (lock) {
			while(lock.thread_count > 0){//检查线程数,如果为0,则表示所有任务处理完成
//				System.out.println("threadCount: "+THREAD_COUNT);
				try {
					lock.wait();//如果任务没有全部完成,则挂起。等待完成的任务给予通知
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
		List<Object> list = new ArrayList<Object>();
		//取出每个任务的处理结果,汇总后返回
		for (Future<Object> future : futres) {
			try {
				Object result = future.get();//因为任务都已经完成,这里直接get
				if(result != null){
					if(result instanceof List)
						list.addAll((List)result);
					else
						list.add(result);
				}
			} catch (Exception e) {
				e.printStackTrace();
			} 
		}
		return list;
	}
}
 

 Job类:

 

public abstract class Job implements Callable<Object> {

	//锁
	private Lock lock = null;

	void setLock(Lock lock) {
		this.lock = lock;
	}

	public Object call() throws Exception {
		Object result = null;
		try{
			result = this.execute();//执行子类具体任务
		}catch(Exception e){
			e.printStackTrace();
		}
		synchronized (lock) {
			//处理完业务后,任务结束,递减线程数,同时唤醒主线程
			lock.thread_count--;
			lock.notifyAll();
		}
		return result;
	}
	/**
	 * 业务处理函数
	 */
	public abstract Object execute();
	
}
class Lock {
	
	int thread_count;
	
}
 

 测试结果:

 

threadCount: 10
running thread id = 8
running thread id = 10
running thread id = 9
running thread id = 12
running thread id = 11
threadCount: 8
threadCount: 7
threadCount: 6
threadCount: 5
running thread id = 12
running thread id = 8
running thread id = 11
threadCount: 2
running thread id = 10
threadCount: 1
running thread id = 9
ResultSize: 10
time: 2001

 OK!

这样每个Executer就可以使用自己的lock,而相互不受同步的影响

 

2
2
分享到:
评论
2 楼 378629846 2013-05-14  
golly2009 写道
...如果我创建了多个Executer,那么所有Job都会持有一把锁,既影响性能,也容易出现死锁的情况。....

应该是所有job都会持有同一把锁吧,因为是静态成员,类在虚拟机中只有一份,所有Executer类的对象共享。
可以设计成单例不?系统中只有一个Executer实例,这样就不会出现死锁,另外你只有主线程在等待,可以把notifyAll换成notify不?

不能是单例的,因为一个executer是一个锁来控制的,如果是单例的,这个executer就只能执行一批job了,不能复用了
1 楼 golly2009 2013-05-08  
...如果我创建了多个Executer,那么所有Job都会持有一把锁,既影响性能,也容易出现死锁的情况。....

应该是所有job都会持有同一把锁吧,因为是静态成员,类在虚拟机中只有一份,所有Executer类的对象共享。
可以设计成单例不?系统中只有一个Executer实例,这样就不会出现死锁,另外你只有主线程在等待,可以把notifyAll换成notify不?

相关推荐

Global site tag (gtag.js) - Google Analytics