source: DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 481

Revision 481, 4.4 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.gssim.DCWormsTags;
4import gssim.schedframe.scheduling.ExecTask;
5
6import java.util.ArrayList;
7import java.util.List;
8
9import org.apache.commons.lang.ArrayUtils;
10import org.apache.commons.logging.Log;
11import org.apache.commons.logging.LogFactory;
12import org.qcg.broker.schemas.resreqs.ParentType;
13import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
14
15import qcg.shared.constants.BrokerConstants;
16import schedframe.ExecutablesList;
17import schedframe.scheduling.tasks.JobInterface;
18import schedframe.scheduling.tasks.Task;
19
20public class JobRegistryImpl extends AbstractJobRegistry {
21       
22        private static final long serialVersionUID = 8030555906990767342L;
23
24        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
25
26        private String context;
27
28        //TO DO - consider data structure
29        protected static final ExecutablesList executables = new ExecutablesList();
30        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
31        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
32
33        public JobRegistryImpl(String context) {
34                this.context = context;
35        }
36
37        public boolean addExecTask(ExecTask newTask) {
38                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) {
39                        synchronized (executables)  {
40                                executables.add(newTask);
41                        }
42                        return true;
43                }
44                return false;
45        }
46
47        public ExecutablesList getExecutableTasks() {
48                return executables;
49        }
50        public List<ExecTask> getTasks(int status) {
51                List<ExecTask> taskList = new ArrayList<ExecTask>();
52                synchronized (executables)  {
53                        for (ExecTask task: executables) {
54                                if (task.getStatus() == status) {
55                                        List<String> visitedResource = task.getVisitedResources();
56                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
57                                                taskList.add(task);
58                                        }
59                                }
60                        }
61                }
62                return taskList;
63        }
64
65        public List<ExecTask> getQueuedTasks() {
66                return getTasks(DCWormsTags.QUEUED);
67        }
68       
69        public List<ExecTask> getRunningTasks() {
70                return getTasks(DCWormsTags.INEXEC);
71        }
72       
73        public List<ExecTask> getReadyTasks() {
74                return getTasks(DCWormsTags.READY);
75        }
76       
77        public List<ExecTask> getFinishedTasks() {
78                return getTasks(DCWormsTags.SUCCESS);
79        }
80       
81        public ExecTask getExecutable(String jobId, String taskId){
82                synchronized (executables)  {
83                        for (ExecTask task : executables) {
84                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId)==0) {
85                                        return task;
86                                }
87                        }
88                }
89                return null;
90        }
91
92        @SuppressWarnings("unchecked")
93        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) {
94                List<Task> availableTasks = new ArrayList<Task>();
95                List<Task> waitingTasks = new ArrayList<Task>();
96               
97                for(int i = 0; i < wuList.size(); i++){
98                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
99                        waitingTasks.addAll((List<Task>)wu.getTask());
100                }
101
102                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
103                return availableTasks;
104        }
105
106
107        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
108               
109                List<Task> availableTasks = new ArrayList<Task>();
110                int size = tasks.size();
111               
112                for(int i = 0; i < size; i++){
113                        int parCnt;
114                        int previousTaskSucceedCnt = 0;
115                        Task task = tasks.get(i);
116                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
117                                continue;
118                        try{           
119                                parCnt = task.getDescription().getWorkflow().getParentCount();
120                        } catch(Exception e){
121                                parCnt = 0;
122                        }
123                        if(parCnt == 0){
124                                availableTasks.add(task);
125                        }
126                        else {
127                                for(int j = 0; j < parCnt; j++){
128                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
129                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
130                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
131                                                        break;
132                                                }
133                                        }
134                                        previousTaskSucceedCnt++;
135                                }
136                               
137                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
138                                        availableTasks.add(task);
139                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
140                                        availableTasks.add(task);
141                                else if (previousTaskSucceedCnt == parCnt)
142                                        availableTasks.add(task);
143                        }
144                }               
145                return availableTasks;
146        }
147       
148        private boolean checkTaskCompletion(String jobID, String taskID){
149                JobInterface<?> job = getJobInfo(jobID);
150                try {
151                        if(job.getTask(taskID).isFinished())
152                                return true;
153                } catch (NoSuchFieldException e) {
154                        //e.printStackTrace();
155                }
156                return false;
157        }
158
159}
Note: See TracBrowser for help on using the repository browser.