source: DCWoRMS/trunk/build/classes/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 539

Revision 539, 4.5 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
RevLine 
[477]1package schedframe.scheduling.manager.tasks;
2
[539]3import gridsim.dcworms.DCWormsTags;
[477]4
5import java.util.ArrayList;
6import java.util.List;
7
8import org.apache.commons.lang.ArrayUtils;
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
12import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
13
[539]14import dcworms.schedframe.scheduling.ExecTask;
15
[477]16import qcg.shared.constants.BrokerConstants;
[539]17import schedframe.ExecutablesList;
[477]18import schedframe.scheduling.tasks.JobInterface;
19import schedframe.scheduling.tasks.Task;
20
21public class JobRegistryImpl extends AbstractJobRegistry {
22       
23        private static final long serialVersionUID = 8030555906990767342L;
24
25        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
26
27        private String context;
28
[539]29        //TO DO - consider data structure
30        protected static final ExecutablesList executables = new ExecutablesList();
31        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
32        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
[477]33
[539]34        public JobRegistryImpl(String context) {
35                this.context = context;
[477]36        }
37
[539]38        public boolean addExecTask(ExecTask newTask) {
39                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) {
40                        synchronized (executables)  {
41                                executables.add(newTask);
[477]42                        }
43                        return true;
44                }
45                return false;
46        }
47
[539]48        public ExecutablesList getExecutableTasks() {
49                return executables;
50        }
[477]51        public List<ExecTask> getTasks(int status) {
52                List<ExecTask> taskList = new ArrayList<ExecTask>();
[539]53                synchronized (executables)  {
54                        for (ExecTask task: executables) {
[477]55                                if (task.getStatus() == status) {
56                                        List<String> visitedResource = task.getVisitedResources();
[539]57                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
[477]58                                                taskList.add(task);
59                                        }
60                                }
61                        }
62                }
63                return taskList;
64        }
65
66        public List<ExecTask> getQueuedTasks() {
[539]67                return getTasks(DCWormsTags.QUEUED);
[477]68        }
69       
70        public List<ExecTask> getRunningTasks() {
[539]71                return getTasks(DCWormsTags.INEXEC);
[477]72        }
73       
74        public List<ExecTask> getReadyTasks() {
[539]75                return getTasks(DCWormsTags.READY);
[477]76        }
77       
78        public List<ExecTask> getFinishedTasks() {
[539]79                return getTasks(DCWormsTags.SUCCESS);
[477]80        }
81       
[539]82        public ExecTask getExecutable(String jobId, String taskId){
83                synchronized (executables)  {
84                        for (ExecTask task : executables) {
85                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
[477]86                                        return task;
87                                }
88                        }
89                }
90                return null;
91        }
92
93        @SuppressWarnings("unchecked")
[539]94        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) {
95                List<Task> availableTasks = new ArrayList<Task>();
[477]96                List<Task> waitingTasks = new ArrayList<Task>();
97               
98                for(int i = 0; i < wuList.size(); i++){
99                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
100                        waitingTasks.addAll((List<Task>)wu.getTask());
101                }
102
[539]103                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
104                return availableTasks;
[477]105        }
106
107
[539]108        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
[477]109               
[539]110                List<Task> availableTasks = new ArrayList<Task>();
111                int size = tasks.size();
[477]112               
113                for(int i = 0; i < size; i++){
114                        int parCnt;
[539]115                        int previousTaskSucceedCnt = 0;
[477]116                        Task task = tasks.get(i);
117                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
118                                continue;
119                        try{           
120                                parCnt = task.getDescription().getWorkflow().getParentCount();
121                        } catch(Exception e){
122                                parCnt = 0;
123                        }
[539]124                        if(parCnt == 0){
125                                availableTasks.add(task);
[477]126                        }
[539]127                        else {
[477]128                                for(int j = 0; j < parCnt; j++){
129                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
130                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
131                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
132                                                        break;
133                                                }
134                                        }
[539]135                                        previousTaskSucceedCnt++;
[477]136                                }
137                               
[539]138                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
139                                        availableTasks.add(task);
140                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
141                                        availableTasks.add(task);
142                                else if (previousTaskSucceedCnt == parCnt)
143                                        availableTasks.add(task);
[477]144                        }
145                }               
[539]146                return availableTasks;
[477]147        }
148       
149        private boolean checkTaskCompletion(String jobID, String taskID){
150                JobInterface<?> job = getJobInfo(jobID);
151                try {
152                        if(job.getTask(taskID).isFinished())
153                                return true;
154                } catch (NoSuchFieldException e) {
155                        //e.printStackTrace();
156                }
157                return false;
158        }
[539]159
[477]160}
Note: See TracBrowser for help on using the repository browser.