----------------------------------------------------------------------------------------------------------------------------------------------------------

 泡牛吧!

                                       希望越来越多的光棍能够泡到牛

-----------------------------------------------------------------------------------------------------------------------------------------------------------

第一个并行程序
最近由于工作需要实现一个对日志的并行处理的程序(还要用到memcached)
我查了些,写了如下一个dome 希望对大家有帮助,里边有很多概念,今天中午吃饭我想出了一个例子来解释什么叫并行运算程序,下班了,代码贴上,明天有时间举个例子吧!
work.java
package com.work;

import java.util.List;

import com.work.job.Job;

/**
 * 工作抽实现象类
 * @author 郝豪
 * @param <E>
 */
public interface Work<E,V>{
   
    public void setWork(E work);
    /**
     * 当前任务
     */
    public Job<E,V> startJob();
    /**
     * 任务总数
     * @return
     */
    public int jobSize();
    /**
     * 执行任务
     */
    public void target();
    /**
     * 合并工作单元
     */
    public  void mergerJob();
    /**
     * 拆分工作单元
     */
    public  void separator();
   
   
    public void execute(Job<E,V> job);
    /**
     * 结束工作
     */
    public void workEnd();
    /**
     * 得到结果
     * @return
     */
    public V getResult();
    /**
     * 得到结果集合
     * @return
     */
    public List<V> getResults();

}

AbstractWork.java

package com.work;

import java.util.List;

import com.work.helper.JobPoolHelper;
import com.work.job.Job;

public abstract class AbstractWork<E,V>  implements Work<E,V>{
    JobPoolHelper<E,V> jobPool = new JobPoolHelper<E,V>();
    protected int startIndex = 0;
    protected boolean isEnd = false; //是否结束工作
    protected E work;
    protected V result;
   
    private Job<E,V> job;
    public void setWork(E work){
        this.work = work;
    }
    /**
     * 执行工作单元
     */
    public void execute(Job<E,V>  job){
        jobPool.add(job);
        this.job = job;
        startIndex ++;
    }
    public void target(){
        separator();//分拆任务
        mergerJob();//合并任务
    }
    public int jobSize(){
        return startIndex;
    }
    /**
     * 结束并行运算
     */
    public void workEnd(){
        jobPool.close();
        isEnd = true;
    }
    /**
     * 当前执行的工作
     */
    public Job<E,V> startJob() {
       
        return job;
    }
    /**
     * 得到结果
     * @return
     */
    public V getResult(){
        return result;
    }
    /**
     * 结果结合
     * @return
     */
    public List<V> getResults(){
        return jobPool.getResult();
    }
}
   
SumWork.java 

package com.work;

import com.work.job.Job;
import com.work.job.SumJob;
/**
 *  线性分割实现
 * @author 郝豪
 * @param <Long>
 */
public class SumWork extends AbstractWork<Integer[],Long> {
   
    
    /**
     * 合并工作
     */
    public void mergerJob() {
        Long sum = 0l;
        for (long s:getResults()) {        
            sum += s;
        }
        super.result = sum;
    }
    public Integer[] subNumber(final Integer[] numbers, int start, int end){
        Integer []subNumber =new  Integer[end-start];
        int i=0;
        for (int j = start; j < end; j++) {
            subNumber [i] = numbers[j];
            i++;
        }
        return subNumber;
        
    }
    /**
     * 分隔工作
     */
    public void separator() {
        Integer [] numbers = (Integer[])work;
        int len = numbers.length;
        int i = 0;
        int pageSize = 3;
        Job<Integer[],Long> sumJob = null;
        int startIndex = 0;
        while(i<len){
            if((i+1)%pageSize==0){
                sumJob = new SumJob();
                sumJob.setJob(subNumber(numbers,(pageSize * startIndex),pageSize * (startIndex+1)));
                execute(sumJob);
                startIndex ++;
            }
            if(((i+1)%pageSize>0)&&(i+1)==len){
                sumJob = new SumJob();
                sumJob.setJob(subNumber(numbers,(pageSize * startIndex),len));
                execute(sumJob);
                startIndex ++;
            }
            i++;
        }
    }
   
   
   
   
}
 



job.java
package com.work.job;

import java.util.concurrent.Callable;

/**
 * 工作单元
 * @author 郝豪
 * @param <E>
 */
public abstract class  Job<E,V> implements Callable<V>{
   
    protected String jobName;
   
    protected E job;
   
    public E getJob() {
        return job;
    }
    public void setJob(E job) {
        this.job = job;
    }
    /**
     * 设置工作单元名
     * @param jobName
     */
    public void setJobName(String jobName) {
        this.jobName = jobName;
    }
    /**
     * 处理后的结果
     * @throws Exception
     */
    public V getResult() throws Exception{
        return call();
    }
   
    /**
     * 执行工作单元
     */
    public abstract V call() throws Exception;
   
}

sumJob.java

package com.work.job;


public class SumJob extends Job<Integer[],Long> {
   
    public Long call() throws Exception {
        Integer[] numbers  = (Integer[])job;
        long sum = 0;
        for (Integer i:numbers) {
            sum += i;
        }
        return Long.valueOf(sum);
    }
}

JobPoolHelper.java

package com.work.helper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.work.job.Job;
/**
 * 处理工作单位的池
 * @author 郝豪
 */
public class JobPoolHelper<E,V> {
    private static ExecutorService exec;
    private CompletionService<V> completionService;
    private int cpuCoreNumber;
    private int taskCount;
    private Future<V> future;
    /**
     * 构造函数
     */
    public JobPoolHelper(){
        init();
    }
    /**
     * 初始化
     */
    public  void init(){
        cpuCoreNumber = Runtime.getRuntime().availableProcessors();
        exec = Executors.newFixedThreadPool(cpuCoreNumber);
        completionService = new ExecutorCompletionService<V>(exec);
       
    }
    /**
     * 处理任务
     */
    public void add(Job<E,V> job){
        if (!exec.isShutdown()) {
            taskCount ++;
            future = completionService.submit(job);
       
        }
    }
    /**
     * 当前执行的结果(稍微有点问题)
     * @return
     */
    public V getJobResult(){
        V  result = null;
        try {
            if(!future.isDone()){
                result = future.get();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return result;
    }
   
    /**
     * 关闭池
     */
    public void close() {
        exec.shutdown();
    }
    /**
     * 得到结果
     * @return list
     */
    public List<V> getResult() {
        List<V> list = new ArrayList<V>();
        for (int i = 0; i < taskCount; i++) {           
            try {
                list.add(completionService.take().get());
            } catch (InterruptedException e) {
                    e.printStackTrace();
            } catch (ExecutionException e) {
                    e.printStackTrace();
            }
        }       
        return list;
    }
}

Main.java

package com.work;

public class Main {
    public static void main(String[] args){
    Integer[] numbers = new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 ,111,22,333,44};
    Work<Integer[],Long> work= new SumWork();
    work.setWork(numbers);
    work.target(); //分拆任务
    work.workEnd(); //结束任务
    System.out.println(work.getResult());
    }
}
haohao   2009-05-21 19:25:22 评论:0   阅读:79   引用:0

发表评论>>

署名发表(评论可管理,不必输入下面的姓名)

姓名:

主题:

内容: 最少15个,最长1000个字符

验证码: (如不清楚,请刷新)

一切版权属于个人(转载例外)