source: DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 493

Revision 493, 4.5 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7
8import org.apache.commons.lang.ArrayUtils;
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
12import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
13
14import dcworms.schedframe.scheduling.ExecTask;
15
16import qcg.shared.constants.BrokerConstants;
17import schedframe.ExecutablesList;
18import schedframe.scheduling.tasks.JobInterface;
19import schedframe.scheduling.tasks.Task;
20
21public class JobRegistryImpl extends AbstractJobRegistry {
22       
23        private static final long serialVersionUID = 8030555906990767342L;
24
25        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
26
27        private String context;
28
29        //TO DO - consider data structure
30        protected static final ExecutablesList executables = new ExecutablesList();
31        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
32        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
33
34        public JobRegistryImpl(String context) {
35                this.context = context;
36        }
37
38        public boolean addExecTask(ExecTask newTask) {
39                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) {
40                        synchronized (executables)  {
41                                executables.add(newTask);
42                        }
43                        return true;
44                }
45                return false;
46        }
47
48        public ExecutablesList getExecutableTasks() {
49                return executables;
50        }
51        public List<ExecTask> getTasks(int status) {
52                List<ExecTask> taskList = new ArrayList<ExecTask>();
53                synchronized (executables)  {
54                        for (ExecTask task: executables) {
55                                if (task.getStatus() == status) {
56                                        List<String> visitedResource = task.getVisitedResources();
57                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
58                                                taskList.add(task);
59                                        }
60                                }
61                        }
62                }
63                return taskList;
64        }
65
66        public List<ExecTask> getQueuedTasks() {
67                return getTasks(DCWormsTags.QUEUED);
68        }
69       
70        public List<ExecTask> getRunningTasks() {
71                return getTasks(DCWormsTags.INEXEC);
72        }
73       
74        public List<ExecTask> getReadyTasks() {
75                return getTasks(DCWormsTags.READY);
76        }
77       
78        public List<ExecTask> getFinishedTasks() {
79                return getTasks(DCWormsTags.SUCCESS);
80        }
81       
82        public ExecTask getExecutable(String jobId, String taskId){
83                synchronized (executables)  {
84                        for (ExecTask task : executables) {
85                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
86                                        return task;
87                                }
88                        }
89                }
90                return null;
91        }
92
93        @SuppressWarnings("unchecked")
94        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) {
95                List<Task> availableTasks = new ArrayList<Task>();
96                List<Task> waitingTasks = new ArrayList<Task>();
97               
98                for(int i = 0; i < wuList.size(); i++){
99                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
100                        waitingTasks.addAll((List<Task>)wu.getTask());
101                }
102
103                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
104                return availableTasks;
105        }
106
107
108        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
109               
110                List<Task> availableTasks = new ArrayList<Task>();
111                int size = tasks.size();
112               
113                for(int i = 0; i < size; i++){
114                        int parCnt;
115                        int previousTaskSucceedCnt = 0;
116                        Task task = tasks.get(i);
117                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
118                                continue;
119                        try{           
120                                parCnt = task.getDescription().getWorkflow().getParentCount();
121                        } catch(Exception e){
122                                parCnt = 0;
123                        }
124                        if(parCnt == 0){
125                                availableTasks.add(task);
126                        }
127                        else {
128                                for(int j = 0; j < parCnt; j++){
129                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
130                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
131                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
132                                                        break;
133                                                }
134                                        }
135                                        previousTaskSucceedCnt++;
136                                }
137                               
138                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
139                                        availableTasks.add(task);
140                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
141                                        availableTasks.add(task);
142                                else if (previousTaskSucceedCnt == parCnt)
143                                        availableTasks.add(task);
144                        }
145                }               
146                return availableTasks;
147        }
148       
149        private boolean checkTaskCompletion(String jobID, String taskID){
150                JobInterface<?> job = getJobInfo(jobID);
151                try {
152                        if(job.getTask(taskID).isFinished())
153                                return true;
154                } catch (NoSuchFieldException e) {
155                        //e.printStackTrace();
156                }
157                return false;
158        }
159
160}
Note: See TracBrowser for help on using the repository browser.