source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1595

Revision 1595, 9.5 KB checked in by wojtekp, 8 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8import java.util.Set;
9
10import org.qcg.broker.schemas.resreqs.ParentType;
11import org.qcg.broker.schemas.resreqs.Workflow;
12import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
13
14import qcg.shared.constants.BrokerConstants;
15import schedframe.ExecutablesList;
16import schedframe.resources.computing.ComputingResource;
17import schedframe.scheduling.tasks.JobInterface;
18import schedframe.scheduling.tasks.Task;
19import dcworms.schedframe.scheduling.ExecTask;
20
21public class JobRegistryImpl extends AbstractJobRegistry {
22
23        private String context;
24        private ComputingResource cr;
25        //TO DO - consider data structure
26        protected static final ExecutablesList executables = new ExecutablesList();
27
28        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
29        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
30
31        public JobRegistryImpl(String context) {
32                this.context = context;
33        }
34       
35        public JobRegistryImpl(ComputingResource cr) {
36                this.cr = cr;
37        }
38
39        public JobRegistryImpl() {
40                this("");
41        }
42       
43        public boolean addExecTask(ExecTask newTask) {
44                if(getTask(newTask.getJobId(), newTask.getId()) == null) {
45                        synchronized (executables) {
46                                executables.add(newTask);
47                        }
48                        return true;
49                }
50                return false;
51        }
52
53        public ExecutablesList getTasks() {
54                return executables;
55        }
56        public List<ExecTask> getTasks(int status) {
57                List<ExecTask> taskList = new ArrayList<ExecTask>();
58                synchronized (executables)  {
59                        for (ExecTask task: executables) {
60                                if (task.getStatus() == status) {
61                                        if(cr != null){
62                                                if(!task.getAllocatedResources().isEmpty()){
63                                                        Set<ComputingResource> visitedResource = task.getAllocatedResources().getLast().getResources();
64                                                        if(visitedResource.contains(cr)){
65                                                                taskList.add(task);
66                                                        } else {
67                                                                for(ComputingResource res: visitedResource){
68                                                                        if(cr.contains(res)){
69                                                                                taskList.add(task);
70                                                                                break;
71                                                                        }
72                                                                }
73                                                        }
74                                                }
75                                        } else {
76                                                if(!task.getAllocatedResources().isEmpty()){
77                                                        Set<String> visitedResource = task.getAllocatedResources().getLast().getResourceNames();
78                                                        for(String res: visitedResource){
79                                                                if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){
80                                                                        taskList.add(task);
81                                                                        break;
82                                                                }
83                                                        }       
84                                                }
85
86                                                if(task.getSchedulerName().equals(context)) {
87                                                        taskList.add(task);
88                                                }
89                                        }
90                                }
91                        }
92                }
93
94                return taskList;
95                //return getTasks(Arrays.asList(status), true);
96        }
97
98        public List<ExecTask> getTasks(List<Integer> statusList, boolean match) {
99                List<ExecTask> taskList = new ArrayList<ExecTask>();
100                synchronized (executables)  {
101                        for (ExecTask task: executables) {
102                                for(int status: statusList){
103                                        if (compareStatus(task.getStatus(), status, match)) {
104                                                if(cr != null){
105                                                        if(!task.getAllocatedResources().isEmpty()){
106                                                                Set<ComputingResource> visitedResource = task.getAllocatedResources().getLast().getResources();
107                                                                if(visitedResource.contains(cr)){
108                                                                        taskList.add(task);
109                                                                } else {
110                                                                        for(ComputingResource res: visitedResource){
111                                                                                if(cr.contains(res)){
112                                                                                        taskList.add(task);
113                                                                                        break;
114                                                                                }
115                                                                        }
116                                                                }
117                                                        }
118                                                } else {
119                                                        if(!task.getAllocatedResources().isEmpty()){
120                                                                Set<String> visitedResource = task.getAllocatedResources().getLast().getResourceNames();
121                                                                for(String res: visitedResource){
122                                                                        if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){
123                                                                                taskList.add(task);
124                                                                                break;
125                                                                        }
126                                                                }       
127                                                        }
128
129                                                        if(task.getSchedulerName().equals(context)) {
130                                                                taskList.add(task);
131                                                        }
132                                                }
133                                        }
134                                }
135                        }
136                }
137
138                return taskList;
139        }
140       
141        private boolean compareStatus(int taskStatus, int status, boolean match){
142                if(match){
143                        if(taskStatus == status)
144                                return true;
145                } else {
146                        if(taskStatus !=status)
147                                return true;
148                }
149                return false;
150        }
151       
152        public List<ExecTask> getQueuedTasks() {
153                return getTasks(DCWormsTags.QUEUED);
154        }
155       
156        public List<ExecTask> getRunningTasks() {
157                return getTasks(DCWormsTags.INEXEC);
158        }
159       
160        public List<ExecTask> getReadyTasks() {
161                return getTasks(DCWormsTags.READY);
162        }
163       
164        public List<ExecTask> getFinishedTasks() {
165                return getTasks(DCWormsTags.SUCCESS);
166        }
167       
168        public ExecTask getTask(String jobId, String taskId){
169                synchronized (executables)  {
170                        for (ExecTask task : executables) {
171                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
172                                        return task;
173                                }
174                        }
175                }
176                return null;
177        }
178
179        @SuppressWarnings("unchecked")
180        public List<Task> getAvailableTasks(List<JobInterface<?>> jobList) {
181                List<Task> availableTasks = new ArrayList<Task>();
182                List<Task> waitingTasks = new ArrayList<Task>();
183               
184                for(int i = 0; i < jobList.size(); i++){
185                        JobInterface<?> job = (JobInterface<?>)jobList.get(i);
186                        waitingTasks.addAll((List<Task>)job.getTask());
187                }
188
189                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
190                return availableTasks;
191        }
192
193        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
194               
195                List<Task> availableTasks = new ArrayList<Task>();
196                int size = tasks.size();
197               
198                for(int i = 0; i < size; i++){
199                        int parCnt;
200                        int previousTaskSucceedCnt = 0;
201                        Task task = tasks.get(i);
202                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
203                                continue;
204                        //the following procedure supports only one nested structure
205                        Workflow w = task.getDescription().getWorkflow();
206                        if (w == null){
207                                availableTasks.add(task);
208                                continue;
209                        }
210                        if(w.getAnd() != null) {
211                                parCnt = w.getAnd().getParentOpTypeItemCount();
212                                if(parCnt == 0) {
213                                        availableTasks.add(task);
214                                } else {
215                                        for(int j = 0; j < parCnt; j++){
216                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
217                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
218                                                        if(!checkTaskState(task.getJobId(), par.getContent(), TaskStatesName.FINISHED)){
219                                                        //if(!checkTaskCompletion(task.getJobId(), par.getContent())){
220                                                                break;
221                                                        }
222                                                }
223                                                previousTaskSucceedCnt++;
224                                        }
225
226                                        if(previousTaskSucceedCnt == parCnt)
227                                                availableTasks.add(task);
228                                }
229                        }
230                        else if(w.getOr() != null) {
231                                parCnt = w.getOr().getParentOpTypeItemCount();
232                                if(parCnt == 0) {
233                                        availableTasks.add(task);
234                                } else {
235                                        for(int j = 0; j < parCnt; j++){
236                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
237                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
238                                                        if(!checkTaskState(task.getJobId(), par.getContent(), TaskStatesName.FINISHED)){
239                                                        //if(!checkTaskCompletion(task.getJobId(), par.getContent())){
240                                                                continue;
241                                                        }
242                                                }
243                                                previousTaskSucceedCnt++;
244                                        }
245
246                                        if(previousTaskSucceedCnt > 0)
247                                                availableTasks.add(task);
248                                }
249                        }
250                        else {
251                                parCnt = w.getParentCount();
252                                if(parCnt == 0) {
253                                        availableTasks.add(task);
254                                } else {
255                                        for(int j = 0; j < parCnt; j++){
256                                                ParentType par = w.getParent(j);
257                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
258                                                        if(par.getContent().contains("_")){
259                                                                if(!checkTaskState(par.getContent().split("_")[0], par.getContent().split("_")[1], TaskStatesName.FINISHED)){
260                                                                //if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){
261                                                                        continue;
262                                                                }
263                                                        }                                                       
264                                                        else if(!checkTaskState(task.getJobId(), par.getContent(), TaskStatesName.FINISHED)){
265                                                        //else if(!checkTaskCompletion(task.getJobId(), par.getContent())){
266                                                                continue;
267                                                        }
268                                                }
269                                                previousTaskSucceedCnt++;
270                                        }
271
272                                        if(previousTaskSucceedCnt == parCnt)
273                                                availableTasks.add(task);
274                                }
275                        }
276                       
277                        /*try{         
278                                parCnt = task.getDescription().getWorkflow().getParentCount();
279                        } catch(Exception e){
280                                parCnt = 0;
281                        }
282                        if(parCnt == 0){
283                                availableTasks.add(task);
284                        }
285                        else {
286                                for(int j = 0; j < parCnt; j++){
287                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
288                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
289                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
290                                                        break;
291                                                }
292                                        }
293                                        previousTaskSucceedCnt++;
294                                }
295                               
296                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
297                                        availableTasks.add(task);
298                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
299                                        availableTasks.add(task);
300                                else if (previousTaskSucceedCnt == parCnt)
301                                        availableTasks.add(task);
302                        }*/
303                }               
304                return availableTasks;
305        }
306       
307        private boolean checkTaskCompletion(String jobID, String taskID){
308                JobInterface<?> job = getJobInfo(jobID);
309                try {
310                        if(job.getTask(taskID).isFinished())
311                                return true;
312                } catch (NoSuchFieldException e) {
313                        //e.printStackTrace();
314                }
315                return false;
316        }
317       
318        private boolean checkTaskState(String jobID, String taskID, TaskStatesName taskState){
319                JobInterface<?> job = getJobInfo(jobID);
320                try {
321                        switch (taskState){
322                                case QUEUED:{
323                                        if(job.getTask(taskID).getStatus() == (int)BrokerConstants.JOB_STATUS_SUBMITTED)
324                                                return true;
325                                }
326                                case FINISHED:{
327                                        if(job.getTask(taskID).isFinished())
328                                                return true;
329                                }
330                        default:
331                                break;
332                        }
333
334                } catch (NoSuchFieldException e) {
335                        //e.printStackTrace();
336                }
337                return false;
338        }
339       
340        public static void reset(){
341                jobs.clear();
342                executables.clear();
343        }
344
345}
Note: See TracBrowser for help on using the repository browser.