第一个并行程序
最近由于工作需要实现一个对日志的并行处理的程序(还要用到memcached)
我查了些,写了如下一个dome 希望对大家有帮助,里边有很多概念,今天中午吃饭我想出了一个例子来解释什么叫并行运算程序,下班了,代码贴上,明天有时间举个例子吧!
work.java
package com.work;
import java.util.List;
import com.work.job.Job;
/**
* 工作抽实现象类
* @author 郝豪
* @param <E>
*/
public interface Work<E,V>{
public void setWork(E work);
/**
* 当前任务
*/
public Job<E,V> startJob();
/**
* 任务总数
* @return
*/
public int jobSize();
/**
* 执行任务
*/
public void target();
/**
* 合并工作单元
*/
public void mergerJob();
/**
* 拆分工作单元
*/
public void separator();
public void execute(Job<E,V> job);
/**
* 结束工作
*/
public void workEnd();
/**
* 得到结果
* @return
*/
public V getResult();
/**
* 得到结果集合
* @return
*/
public List<V> getResults();
}
AbstractWork.java
package com.work;
import java.util.List;
import com.work.helper.JobPoolHelper;
import com.work.job.Job;
public abstract class AbstractWork<E,V> implements Work<E,V>{
JobPoolHelper<E,V> jobPool = new JobPoolHelper<E,V>();
protected int startIndex = 0;
protected boolean isEnd = false; //是否结束工作
protected E work;
protected V result;
private Job<E,V> job;
public void setWork(E work){
this.work = work;
}
/**
* 执行工作单元
*/
public void execute(Job<E,V> job){
jobPool.add(job);
this.job = job;
startIndex ++;
}
public void target(){
separator();//分拆任务
mergerJob();//合并任务
}
public int jobSize(){
return startIndex;
}
/**
* 结束并行运算
*/
public void workEnd(){
jobPool.close();
isEnd = true;
}
/**
* 当前执行的工作
*/
public Job<E,V> startJob() {
return job;
}
/**
* 得到结果
* @return
*/
public V getResult(){
return result;
}
/**
* 结果结合
* @return
*/
public List<V> getResults(){
return jobPool.getResult();
}
}
SumWork.java
package com.work;
import com.work.job.Job;
import com.work.job.SumJob;
/**
* 线性分割实现
* @author 郝豪
* @param <Long>
*/
public class SumWork extends AbstractWork<Integer[],Long> {
/**
* 合并工作
*/
public void mergerJob() {
Long sum = 0l;
for (long s:getResults()) {
sum += s;
}
super.result = sum;
}
public Integer[] subNumber(final Integer[] numbers, int start, int end){
Integer []subNumber =new Integer[end-start];
int i=0;
for (int j = start; j < end; j++) {
subNumber [i] = numbers[j];
i++;
}
return subNumber;
}
/**
* 分隔工作
*/
public void separator() {
Integer [] numbers = (Integer[])work;
int len = numbers.length;
int i = 0;
int pageSize = 3;
Job<Integer[],Long> sumJob = null;
int startIndex = 0;
while(i<len){
if((i+1)%pageSize==0){
sumJob = new SumJob();
sumJob.setJob(subNumber(numbers,(pageSize * startIndex),pageSize * (startIndex+1)));
execute(sumJob);
startIndex ++;
}
if(((i+1)%pageSize>0)&&(i+1)==len){
sumJob = new SumJob();
sumJob.setJob(subNumber(numbers,(pageSize * startIndex),len));
execute(sumJob);
startIndex ++;
}
i++;
}
}
}
job.java
package com.work.job;
import java.util.concurrent.Callable;
/**
* 工作单元
* @author 郝豪
* @param <E>
*/
public abstract class Job<E,V> implements Callable<V>{
protected String jobName;
protected E job;
public E getJob() {
return job;
}
public void setJob(E job) {
this.job = job;
}
/**
* 设置工作单元名
* @param jobName
*/
public void setJobName(String jobName) {
this.jobName = jobName;
}
/**
* 处理后的结果
* @throws Exception
*/
public V getResult() throws Exception{
return call();
}
/**
* 执行工作单元
*/
public abstract V call() throws Exception;
}
sumJob.java
package com.work.job;
public class SumJob extends Job<Integer[],Long> {
public Long call() throws Exception {
Integer[] numbers = (Integer[])job;
long sum = 0;
for (Integer i:numbers) {
sum += i;
}
return Long.valueOf(sum);
}
}
JobPoolHelper.java
package com.work.helper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.work.job.Job;
/**
* 处理工作单位的池
* @author 郝豪
*/
public class JobPoolHelper<E,V> {
private static ExecutorService exec;
private CompletionService<V> completionService;
private int cpuCoreNumber;
private int taskCount;
private Future<V> future;
/**
* 构造函数
*/
public JobPoolHelper(){
init();
}
/**
* 初始化
*/
public void init(){
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
completionService = new ExecutorCompletionService<V>(exec);
}
/**
* 处理任务
*/
public void add(Job<E,V> job){
if (!exec.isShutdown()) {
taskCount ++;
future = completionService.submit(job);
}
}
/**
* 当前执行的结果(稍微有点问题)
* @return
*/
public V getJobResult(){
V result = null;
try {
if(!future.isDone()){
result = future.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return result;
}
/**
* 关闭池
*/
public void close() {
exec.shutdown();
}
/**
* 得到结果
* @return list
*/
public List<V> getResult() {
List<V> list = new ArrayList<V>();
for (int i = 0; i < taskCount; i++) {
try {
list.add(completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return list;
}
}
Main.java
package com.work;
public class Main {
public static void main(String[] args){
Integer[] numbers = new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 ,111,22,333,44};
Work<Integer[],Long> work= new SumWork();
work.setWork(numbers);
work.target(); //分拆任务
work.workEnd(); //结束任务
System.out.println(work.getResult());
}
}
我查了些,写了如下一个dome 希望对大家有帮助,里边有很多概念,今天中午吃饭我想出了一个例子来解释什么叫并行运算程序,下班了,代码贴上,明天有时间举个例子吧!
work.java
package com.work;
import java.util.List;
import com.work.job.Job;
/**
* 工作抽实现象类
* @author 郝豪
* @param <E>
*/
public interface Work<E,V>{
public void setWork(E work);
/**
* 当前任务
*/
public Job<E,V> startJob();
/**
* 任务总数
* @return
*/
public int jobSize();
/**
* 执行任务
*/
public void target();
/**
* 合并工作单元
*/
public void mergerJob();
/**
* 拆分工作单元
*/
public void separator();
public void execute(Job<E,V> job);
/**
* 结束工作
*/
public void workEnd();
/**
* 得到结果
* @return
*/
public V getResult();
/**
* 得到结果集合
* @return
*/
public List<V> getResults();
}
AbstractWork.java
package com.work;
import java.util.List;
import com.work.helper.JobPoolHelper;
import com.work.job.Job;
public abstract class AbstractWork<E,V> implements Work<E,V>{
JobPoolHelper<E,V> jobPool = new JobPoolHelper<E,V>();
protected int startIndex = 0;
protected boolean isEnd = false; //是否结束工作
protected E work;
protected V result;
private Job<E,V> job;
public void setWork(E work){
this.work = work;
}
/**
* 执行工作单元
*/
public void execute(Job<E,V> job){
jobPool.add(job);
this.job = job;
startIndex ++;
}
public void target(){
separator();//分拆任务
mergerJob();//合并任务
}
public int jobSize(){
return startIndex;
}
/**
* 结束并行运算
*/
public void workEnd(){
jobPool.close();
isEnd = true;
}
/**
* 当前执行的工作
*/
public Job<E,V> startJob() {
return job;
}
/**
* 得到结果
* @return
*/
public V getResult(){
return result;
}
/**
* 结果结合
* @return
*/
public List<V> getResults(){
return jobPool.getResult();
}
}
SumWork.java
package com.work;
import com.work.job.Job;
import com.work.job.SumJob;
/**
* 线性分割实现
* @author 郝豪
* @param <Long>
*/
public class SumWork extends AbstractWork<Integer[],Long> {
/**
* 合并工作
*/
public void mergerJob() {
Long sum = 0l;
for (long s:getResults()) {
sum += s;
}
super.result = sum;
}
public Integer[] subNumber(final Integer[] numbers, int start, int end){
Integer []subNumber =new Integer[end-start];
int i=0;
for (int j = start; j < end; j++) {
subNumber [i] = numbers[j];
i++;
}
return subNumber;
}
/**
* 分隔工作
*/
public void separator() {
Integer [] numbers = (Integer[])work;
int len = numbers.length;
int i = 0;
int pageSize = 3;
Job<Integer[],Long> sumJob = null;
int startIndex = 0;
while(i<len){
if((i+1)%pageSize==0){
sumJob = new SumJob();
sumJob.setJob(subNumber(numbers,(pageSize * startIndex),pageSize * (startIndex+1)));
execute(sumJob);
startIndex ++;
}
if(((i+1)%pageSize>0)&&(i+1)==len){
sumJob = new SumJob();
sumJob.setJob(subNumber(numbers,(pageSize * startIndex),len));
execute(sumJob);
startIndex ++;
}
i++;
}
}
}
job.java
package com.work.job;
import java.util.concurrent.Callable;
/**
* 工作单元
* @author 郝豪
* @param <E>
*/
public abstract class Job<E,V> implements Callable<V>{
protected String jobName;
protected E job;
public E getJob() {
return job;
}
public void setJob(E job) {
this.job = job;
}
/**
* 设置工作单元名
* @param jobName
*/
public void setJobName(String jobName) {
this.jobName = jobName;
}
/**
* 处理后的结果
* @throws Exception
*/
public V getResult() throws Exception{
return call();
}
/**
* 执行工作单元
*/
public abstract V call() throws Exception;
}
sumJob.java
package com.work.job;
public class SumJob extends Job<Integer[],Long> {
public Long call() throws Exception {
Integer[] numbers = (Integer[])job;
long sum = 0;
for (Integer i:numbers) {
sum += i;
}
return Long.valueOf(sum);
}
}
JobPoolHelper.java
package com.work.helper;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import com.work.job.Job;
/**
* 处理工作单位的池
* @author 郝豪
*/
public class JobPoolHelper<E,V> {
private static ExecutorService exec;
private CompletionService<V> completionService;
private int cpuCoreNumber;
private int taskCount;
private Future<V> future;
/**
* 构造函数
*/
public JobPoolHelper(){
init();
}
/**
* 初始化
*/
public void init(){
cpuCoreNumber = Runtime.getRuntime().availableProcessors();
exec = Executors.newFixedThreadPool(cpuCoreNumber);
completionService = new ExecutorCompletionService<V>(exec);
}
/**
* 处理任务
*/
public void add(Job<E,V> job){
if (!exec.isShutdown()) {
taskCount ++;
future = completionService.submit(job);
}
}
/**
* 当前执行的结果(稍微有点问题)
* @return
*/
public V getJobResult(){
V result = null;
try {
if(!future.isDone()){
result = future.get();
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return result;
}
/**
* 关闭池
*/
public void close() {
exec.shutdown();
}
/**
* 得到结果
* @return list
*/
public List<V> getResult() {
List<V> list = new ArrayList<V>();
for (int i = 0; i < taskCount; i++) {
try {
list.add(completionService.take().get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return list;
}
}
Main.java
package com.work;
public class Main {
public static void main(String[] args){
Integer[] numbers = new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 ,111,22,333,44};
Work<Integer[],Long> work= new SumWork();
work.setWork(numbers);
work.target(); //分拆任务
work.workEnd(); //结束任务
System.out.println(work.getResult());
}
}
haohao
2009-05-21 19:25:22
评论:0
阅读:79
引用:0
