source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1548

Revision 1548, 8.7 KB checked in by wojtekp, 10 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.Arrays;
7import java.util.List;
8import java.util.Set;
9
10import org.qcg.broker.schemas.resreqs.ParentType;
11import org.qcg.broker.schemas.resreqs.Workflow;
12import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
13
14import qcg.shared.constants.BrokerConstants;
15import schedframe.ExecutablesList;
16import schedframe.resources.computing.ComputingResource;
17import schedframe.scheduling.tasks.JobInterface;
18import schedframe.scheduling.tasks.Task;
19import dcworms.schedframe.scheduling.ExecTask;
20
21public class JobRegistryImpl extends AbstractJobRegistry {
22
23        private String context;
24        private ComputingResource cr;
25        //TO DO - consider data structure
26        protected static final ExecutablesList executables = new ExecutablesList();
27
28        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
29        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
30
31        public JobRegistryImpl(String context) {
32                this.context = context;
33        }
34       
35        public JobRegistryImpl(ComputingResource cr) {
36                this.cr = cr;
37        }
38
39        public JobRegistryImpl() {
40                this("");
41        }
42       
43        public boolean addExecTask(ExecTask newTask) {
44                if(getTask(newTask.getJobId(), newTask.getId()) == null) {
45                        synchronized (executables) {
46                                executables.add(newTask);
47                        }
48                        return true;
49                }
50                return false;
51        }
52
53        public ExecutablesList getTasks() {
54                return executables;
55        }
56        public List<ExecTask> getTasks(int status) {
57                List<ExecTask> taskList = new ArrayList<ExecTask>();
58                synchronized (executables)  {
59                        for (ExecTask task: executables) {
60                                if (task.getStatus() == status) {
61                                        if(cr != null){
62                                                if(!task.getAllocatedResources().isEmpty()){
63                                                        Set<ComputingResource> visitedResource = task.getAllocatedResources().getLast().getResources();
64                                                        if(visitedResource.contains(cr)){
65                                                                taskList.add(task);
66                                                        } else {
67                                                                for(ComputingResource res: visitedResource){
68                                                                        if(cr.contains(res)){
69                                                                                taskList.add(task);
70                                                                                break;
71                                                                        }
72                                                                }
73                                                        }
74                                                }
75                                        } else {
76                                                if(!task.getAllocatedResources().isEmpty()){
77                                                        Set<String> visitedResource = task.getAllocatedResources().getLast().getResourceNames();
78                                                        for(String res: visitedResource){
79                                                                if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){
80                                                                        taskList.add(task);
81                                                                        break;
82                                                                }
83                                                        }       
84                                                }
85
86                                                if(task.getSchedulerName().equals(context)) {
87                                                        taskList.add(task);
88                                                }
89                                        }
90                                }
91                        }
92                }
93
94                return taskList;
95                //return getTasks(Arrays.asList(status), true);
96        }
97
98        public List<ExecTask> getTasks(List<Integer> statusList, boolean match) {
99                List<ExecTask> taskList = new ArrayList<ExecTask>();
100                synchronized (executables)  {
101                        for (ExecTask task: executables) {
102                                for(int status: statusList){
103                                        if (compareStatus(task.getStatus(), status, match)) {
104                                                if(cr != null){
105                                                        if(!task.getAllocatedResources().isEmpty()){
106                                                                Set<ComputingResource> visitedResource = task.getAllocatedResources().getLast().getResources();
107                                                                if(visitedResource.contains(cr)){
108                                                                        taskList.add(task);
109                                                                } else {
110                                                                        for(ComputingResource res: visitedResource){
111                                                                                if(cr.contains(res)){
112                                                                                        taskList.add(task);
113                                                                                        break;
114                                                                                }
115                                                                        }
116                                                                }
117                                                        }
118                                                } else {
119                                                        if(!task.getAllocatedResources().isEmpty()){
120                                                                Set<String> visitedResource = task.getAllocatedResources().getLast().getResourceNames();
121                                                                for(String res: visitedResource){
122                                                                        if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){
123                                                                                taskList.add(task);
124                                                                                break;
125                                                                        }
126                                                                }       
127                                                        }
128
129                                                        if(task.getSchedulerName().equals(context)) {
130                                                                taskList.add(task);
131                                                        }
132                                                }
133                                        }
134                                }
135                        }
136                }
137
138                return taskList;
139        }
140       
141        private boolean compareStatus(int taskStatus, int status, boolean match){
142                if(match){
143                        if(taskStatus == status)
144                                return true;
145                } else {
146                        if(taskStatus !=status)
147                                return true;
148                }
149                return false;
150        }
151       
152        public List<ExecTask> getQueuedTasks() {
153                return getTasks(DCWormsTags.QUEUED);
154        }
155       
156        public List<ExecTask> getRunningTasks() {
157                return getTasks(DCWormsTags.INEXEC);
158        }
159       
160        public List<ExecTask> getReadyTasks() {
161                return getTasks(DCWormsTags.READY);
162        }
163       
164        public List<ExecTask> getFinishedTasks() {
165                return getTasks(DCWormsTags.SUCCESS);
166        }
167       
168        public ExecTask getTask(String jobId, String taskId){
169                synchronized (executables)  {
170                        for (ExecTask task : executables) {
171                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
172                                        return task;
173                                }
174                        }
175                }
176                return null;
177        }
178
179        @SuppressWarnings("unchecked")
180        public List<Task> getAvailableTasks(List<JobInterface<?>> jobList) {
181                List<Task> availableTasks = new ArrayList<Task>();
182                List<Task> waitingTasks = new ArrayList<Task>();
183               
184                for(int i = 0; i < jobList.size(); i++){
185                        JobInterface<?> job = (JobInterface<?>)jobList.get(i);
186                        waitingTasks.addAll((List<Task>)job.getTask());
187                }
188
189                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
190                return availableTasks;
191        }
192
193        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
194               
195                List<Task> availableTasks = new ArrayList<Task>();
196                int size = tasks.size();
197               
198                for(int i = 0; i < size; i++){
199                        int parCnt;
200                        int previousTaskSucceedCnt = 0;
201                        Task task = tasks.get(i);
202                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
203                                continue;
204                        //the following procedure supports only one nested structure
205                        Workflow w = task.getDescription().getWorkflow();
206                        if (w == null){
207                                availableTasks.add(task);
208                                continue;
209                        }
210                        if(w.getAnd() != null) {
211                                parCnt = w.getAnd().getParentOpTypeItemCount();
212                                if(parCnt == 0)
213                                {
214                                        availableTasks.add(task);
215                                }
216                                else
217                                {
218                                        for(int j = 0; j < parCnt; j++){
219                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
220                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
221                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
222                                                                break;
223                                                        }
224                                                }
225                                                previousTaskSucceedCnt++;
226                                        }
227
228                                        if(previousTaskSucceedCnt == parCnt)
229                                                availableTasks.add(task);
230                                }
231                        }
232                        else if(w.getOr() != null) {
233                                parCnt = w.getOr().getParentOpTypeItemCount();
234                                if(parCnt == 0)
235                                {
236                                        availableTasks.add(task);
237                                }
238                                else
239                                {
240                                        for(int j = 0; j < parCnt; j++){
241                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
242                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
243                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
244                                                                continue;
245                                                        }
246                                                }
247                                                previousTaskSucceedCnt++;
248                                        }
249
250                                        if(previousTaskSucceedCnt > 0)
251                                                availableTasks.add(task);
252                                }
253                        }
254                        else {
255                                parCnt = w.getParentCount();
256                                if(parCnt == 0)
257                                {
258                                        availableTasks.add(task);
259                                }
260                                else
261                                {
262                                        for(int j = 0; j < parCnt; j++){
263                                                ParentType par = w.getParent(j);
264                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
265                                                        if(par.getContent().contains("_")){
266                                                                if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){
267                                                                        continue;
268                                                                }
269                                                        }
270                                                        else if(!checkTaskCompletion(task.getJobId(), par.getContent())){
271                                                                continue;
272                                                        }
273                                                }
274                                                previousTaskSucceedCnt++;
275                                        }
276
277                                        if(previousTaskSucceedCnt == parCnt)
278                                                availableTasks.add(task);
279                                }
280                        }
281                       
282                        /*try{         
283                                parCnt = task.getDescription().getWorkflow().getParentCount();
284                        } catch(Exception e){
285                                parCnt = 0;
286                        }
287                        if(parCnt == 0){
288                                availableTasks.add(task);
289                        }
290                        else {
291                                for(int j = 0; j < parCnt; j++){
292                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
293                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
294                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
295                                                        break;
296                                                }
297                                        }
298                                        previousTaskSucceedCnt++;
299                                }
300                               
301                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
302                                        availableTasks.add(task);
303                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
304                                        availableTasks.add(task);
305                                else if (previousTaskSucceedCnt == parCnt)
306                                        availableTasks.add(task);
307                        }*/
308                }               
309                return availableTasks;
310        }
311       
312        private boolean checkTaskCompletion(String jobID, String taskID){
313                JobInterface<?> job = getJobInfo(jobID);
314                try {
315                        if(job.getTask(taskID).isFinished())
316                                return true;
317                } catch (NoSuchFieldException e) {
318                        //e.printStackTrace();
319                }
320                return false;
321        }
322       
323        public static void reset(){
324                jobs.clear();
325                executables.clear();
326        }
327
328}
Note: See TracBrowser for help on using the repository browser.