这一篇呢,主要介绍其实现机理。 当然,秉承偶的一向的观点,让新手也能看得懂。
首先看工作的接口:1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 | public interface Work extends Serializable { /** * 返回工作类型,每个工作都有一个工作类型,包工头及工人只能处理同样类型的工作 * * @return */ String getType(); String getId(); /** * 返回后续步骤的工作,如果有,说明是复合工作,如果没有,说明是简单工作 * * @return */ Work getNextWork(); /** * 设置后续步骤工作 * * @param nextWork 后续工作 * @return 返回后续工作 */ Work setNextWork(Work nextWork); /** * 是否需要序列化 * * @return true表示工作永不丢失,false表示容器关闭即丢失 */ boolean isNeedSerialize(); /** * 设置是否需要进行序列化,如果要用到MQ,则需要设置为需要序列化 * * @param needSerialize true表示工作永不丢失,false表示容器关闭即丢失 */ void setNeedSerialize(boolean needSerialize); /** * 返回输入仓库 * * @return */ Warehouse getInputWarehouse(); /** * 设置输入仓库 * * @param inputWarehouse */ void setInputWarehouse(Warehouse inputWarehouse); /** * 设置工作状态 * * @param workStatus */ void setWorkStatus(WorkStatus workStatus); /** * 获取工作状态 * * @return */ WorkStatus getWorkStatus();} |
是不是很简单,它的实现也是同样简单:
12345678 | public class WorkDefault implements Work { private String id; private String type; private Warehouse inputWarehouse; private boolean needSerialize = false; private Work nextStepWork = null; private WorkStatus workStatus = WorkStatus.WAITING;} |
基本上就是上述属性的set,get方法。 工人的接口如下:
1234567891011121314151617181920212223242526272829 | /*** 工人,用于干具体的工作* Created by luoguo on 14-1-8.*/public interface Worker extends ParallelObject { /** * 执行工作 * * @return */ Warehouse work(Work work) throws RemoteException; /** * 是否接受工作 * 即使是同样类型的工人,有可能对工作也挑三捡四,这里给了工人一定的灵活性 * * @param work * @return true表示接受,false表示不接受 */ boolean acceptWork(Work work) throws RemoteException; /** * 返回类型 * * @return */ String getType() throws RemoteException;} |
是不是也很简单? 下面看看工头:
123456789101112131415161718192021222324252627282930313233 | /*** 包工头* 包工头用于带着一组工人并完成对应的任务* Created by luoguo on 14-1-8.*/public interface Foreman extends ParallelObject { /** * 返回执行哪种类型的工作任务 * * @return */ String getType() throws RemoteException; /** * 开始干活以完成工作 */ Warehouse work(Work work, List<Worker> workerList) throws IOException; /** * 设置工作合并器 * * @param workCombiner */ void setWorkCombiner(WorkCombiner workCombiner); /** * 设置工作分解器 * * @param workSplitter */ void setWorkSplitter(WorkSplitter workSplitter);} |
下面看看职业介绍所,呵呵,这个就复杂些了(方法多了)
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 | /*** 职介所* 职介所是分布式处理的核心场所,所有工作相关的元素都要通过职介所进行关联* Created by luoguo on 14-1-8.*/public interface JobCenter { String WORK_QUEUE = "WorkQueue"; String FOREMAN = "Foreman"; String WORKER = "Worker"; RmiServer getRmiServer(); void setRmiServer(RmiServer rmiServer); /** * 注册工人 * * @param worker */ void registerWorker(Worker worker) throws RemoteException; /** * 返回工作队列对象 * * @return */ WorkQueue getWorkQueue(); /** * 注消工人 * * @param worker */ void unregisterWorker(Worker worker) throws RemoteException; /** * 注册一份工作,工作情况不需要马上关注。因此也就不用等待,马上返回可以进行其它处理 * 如果有返回结果,可以通过异步方式,异步方式可以用后续工作的方式来指定 * * @param work */ void registerWork(Work work) throws IOException; /** * 取消工作,在工作没有分配出去之前,可以从职介所注消工作,如果工作已经分配出去,则无法注消 * * @param work */ void unregisterWork(Work work) throws RemoteException; /** * 返回指定工作的工作状态 * * @param work * @return */ WorkStatus getWorkStatus(Work work) throws RemoteException; /** * 执行一项工作,期望同步得到结果或异常 * 如果没有合适的工人或包工头进行处理,马上会抛出异常 * * @param work */ Warehouse doWork(Work work) throws IOException; /** * 注册包工头 * * @param foreman */ void registerForeman(Foreman foreman) throws RemoteException; /** * 注销包工头 * * @param foreman */ void unregisterForeMan(Foreman foreman) throws RemoteException; /** * 返回具有某种类型的空闲且愿意接受工作的工人列表 * * @return */ List<Worker> getIdleWorkerList(Work work); /** * 返回所有的工作列表 * * @return */ List<Work> getWorkList() throws RemoteException; /** * 返回某种类型的某种状态的工作列表 * * @return */ List<Work> getWorkList(String type, WorkStatus workStatus) throws RemoteException; /** * 返回组织某种工作的的空闲工头列表 * * @param type * @return */ List<Foreman> getIdleForeman(String type); /** * 自动进行匹配,如果有匹配成功的,则予以触发执行 */ void autoMatch() throws IOException; /** * 职业介绍所关门 * * @throws RemoteException */ void stop() throws RemoteException;} |
讲过了四个重要接口,现在说说实现思路: 工人、包工头都是无状态的。这有个好处是不管来多少工作,都可以进行处理;缺点是没有办法进行后续干预。 综合来说,我还是觉得无状态的比有状态的更好。 因为一开始我的实现是有状态的,甚至可以让包工头取消一个工作,包工头再让工人取消工作,但是这样会带来异常复杂的分布式状态维护。但是改成用无状态的模式,就方便多了。 在职业介绍所,有两个重要的方法,一个是doWork,即立即执行一个工作,如果找不到合适的工头和工人,就会抛出异常;另外一个是autoMatch触发职业介绍所进行一次自动匹配。之所以没有做在职业介绍所内部开启线程是为了给外部提供更方便的控制。 所以,其总体设计思想就是开个职业介绍所,工人或工头、工作就可以注册进来,由职业介绍所来撮合成合适的虚拟小组来达成任务。是不是很容易理解? 下面就是所有的接口与实现类了:
对于做并行开发的人员来说: 职业介绍所,工人,工头都不用开发,框架自带的已经足够用了。开发人员只要开发工人和工作分解合并器即可。 工人继承AbstractWorker之后,只有一个方法实现即可。工作分解一个方法,工作合并一个方法,其它的都交给Tiny并行计算框架吧。 总结: 此框架对并行计算的参与者进行了分解,分解为职业介绍所、工头、工人及工作,职业介绍所是中心,职业介绍所停止运行,将无法进行并行计算。工头、工人是可以在任意计算机的,工作是可以在任意节点添加的,由于框架提供了工作序列化功能,因此只要设置工作需要序列化是true,此工作将一直存在,直到被完成,利用此特性可以方便的实现简单的MQ。同时为方便业务实现,工人有抽象类,建议直接继承抽象类实现要关业务方法即可。 注意:目前在执行过程中工人注销或停止服务,会导致整个工作执行失败,下次继续进行执行,后续拟改成考虑用其它工人继续工作。 今天比较忙,写得比较匆忙,如果有不清楚的可以在下面提问。