source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1434

Revision 1434, 6.9 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Set;
8
9import org.qcg.broker.schemas.resreqs.ParentType;
10import org.qcg.broker.schemas.resreqs.Workflow;
11import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
12
13import qcg.shared.constants.BrokerConstants;
14import schedframe.ExecutablesList;
15import schedframe.resources.computing.ComputingResource;
16import schedframe.scheduling.tasks.JobInterface;
17import schedframe.scheduling.tasks.Task;
18import dcworms.schedframe.scheduling.ExecTask;
19
20public class JobRegistryImpl extends AbstractJobRegistry {
21
22        private String context;
23        private ComputingResource cr;
24        //TO DO - consider data structure
25        protected static final ExecutablesList executables = new ExecutablesList();
26
27        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
28        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
29
30        public JobRegistryImpl(String context) {
31                this.context = context;
32        }
33       
34        public JobRegistryImpl(ComputingResource cr) {
35                this.cr = cr;
36        }
37
38        public JobRegistryImpl() {
39                this("");
40        }
41       
42        public boolean addExecTask(ExecTask newTask) {
43                if(getTask(newTask.getJobId(), newTask.getId()) == null) {
44                        synchronized (executables) {
45                                executables.add(newTask);
46                        }
47                        return true;
48                }
49                return false;
50        }
51
52        public ExecutablesList getTasks() {
53                return executables;
54        }
55        public List<ExecTask> getTasks(int status) {
56                List<ExecTask> taskList = new ArrayList<ExecTask>();
57                synchronized (executables)  {
58                        for (ExecTask task: executables) {
59                                if (task.getStatus() == status) {
60                                        if(cr != null){
61                                                Set<ComputingResource> visitedResource = task.getAllocatedResources().getLast().getResources();
62                                                if(visitedResource.contains(cr)){
63                                                        taskList.add(task);
64                                                } else {
65                                                        for(ComputingResource res: visitedResource){
66                                                                if(cr.contains(res)){
67                                                                        taskList.add(task);
68                                                                        break;
69                                                                }
70                                                        }
71                                                }
72                                        } else {
73                                                Set<String> visitedResource = task.getAllocatedResources().getLast().getResourceNames();
74                                                for(String res: visitedResource){
75                                                        if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){
76                                                                taskList.add(task);
77                                                                break;
78                                                        }
79                                                }
80
81                                                if(task.getSchedulerName().equals(context)) {
82                                                        taskList.add(task);
83                                                }
84                                        }
85                                }
86                        }
87                }
88
89                return taskList;
90        }
91
92        public List<ExecTask> getQueuedTasks() {
93                return getTasks(DCWormsTags.QUEUED);
94        }
95       
96        public List<ExecTask> getRunningTasks() {
97                return getTasks(DCWormsTags.INEXEC);
98        }
99       
100        public List<ExecTask> getReadyTasks() {
101                return getTasks(DCWormsTags.READY);
102        }
103       
104        public List<ExecTask> getFinishedTasks() {
105                return getTasks(DCWormsTags.SUCCESS);
106        }
107       
108        public ExecTask getTask(String jobId, String taskId){
109                synchronized (executables)  {
110                        for (ExecTask task : executables) {
111                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
112                                        return task;
113                                }
114                        }
115                }
116                return null;
117        }
118
119        @SuppressWarnings("unchecked")
120        public List<Task> getAvailableTasks(List<JobInterface<?>> jobList) {
121                List<Task> availableTasks = new ArrayList<Task>();
122                List<Task> waitingTasks = new ArrayList<Task>();
123               
124                for(int i = 0; i < jobList.size(); i++){
125                        JobInterface<?> job = (JobInterface<?>)jobList.get(i);
126                        waitingTasks.addAll((List<Task>)job.getTask());
127                }
128
129                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
130                return availableTasks;
131        }
132
133        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
134               
135                List<Task> availableTasks = new ArrayList<Task>();
136                int size = tasks.size();
137               
138                for(int i = 0; i < size; i++){
139                        int parCnt;
140                        int previousTaskSucceedCnt = 0;
141                        Task task = tasks.get(i);
142                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
143                                continue;
144                        //the following procedure supports only one nested structure
145                        Workflow w = task.getDescription().getWorkflow();
146                        if (w == null){
147                                availableTasks.add(task);
148                                continue;
149                        }
150                        if(w.getAnd() != null) {
151                                parCnt = w.getAnd().getParentOpTypeItemCount();
152                                if(parCnt == 0)
153                                {
154                                        availableTasks.add(task);
155                                }
156                                else
157                                {
158                                        for(int j = 0; j < parCnt; j++){
159                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
160                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
161                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
162                                                                break;
163                                                        }
164                                                }
165                                                previousTaskSucceedCnt++;
166                                        }
167
168                                        if(previousTaskSucceedCnt == parCnt)
169                                                availableTasks.add(task);
170                                }
171                        }
172                        else if(w.getOr() != null) {
173                                parCnt = w.getOr().getParentOpTypeItemCount();
174                                if(parCnt == 0)
175                                {
176                                        availableTasks.add(task);
177                                }
178                                else
179                                {
180                                        for(int j = 0; j < parCnt; j++){
181                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
182                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
183                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
184                                                                continue;
185                                                        }
186                                                }
187                                                previousTaskSucceedCnt++;
188                                        }
189
190                                        if(previousTaskSucceedCnt > 0)
191                                                availableTasks.add(task);
192                                }
193                        }
194                        else {
195                                parCnt = w.getParentCount();
196                                if(parCnt == 0)
197                                {
198                                        availableTasks.add(task);
199                                }
200                                else
201                                {
202                                        for(int j = 0; j < parCnt; j++){
203                                                ParentType par = w.getParent(j);
204                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
205                                                        if(par.getContent().contains("_")){
206                                                                if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){
207                                                                        continue;
208                                                                }
209                                                        }
210                                                        else if(!checkTaskCompletion(task.getJobId(), par.getContent())){
211                                                                continue;
212                                                        }
213                                                }
214                                                previousTaskSucceedCnt++;
215                                        }
216
217                                        if(previousTaskSucceedCnt == parCnt)
218                                                availableTasks.add(task);
219                                }
220                        }
221                       
222                        /*try{         
223                                parCnt = task.getDescription().getWorkflow().getParentCount();
224                        } catch(Exception e){
225                                parCnt = 0;
226                        }
227                        if(parCnt == 0){
228                                availableTasks.add(task);
229                        }
230                        else {
231                                for(int j = 0; j < parCnt; j++){
232                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
233                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
234                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
235                                                        break;
236                                                }
237                                        }
238                                        previousTaskSucceedCnt++;
239                                }
240                               
241                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
242                                        availableTasks.add(task);
243                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
244                                        availableTasks.add(task);
245                                else if (previousTaskSucceedCnt == parCnt)
246                                        availableTasks.add(task);
247                        }*/
248                }               
249                return availableTasks;
250        }
251       
252        private boolean checkTaskCompletion(String jobID, String taskID){
253                JobInterface<?> job = getJobInfo(jobID);
254                try {
255                        if(job.getTask(taskID).isFinished())
256                                return true;
257                } catch (NoSuchFieldException e) {
258                        //e.printStackTrace();
259                }
260                return false;
261        }
262
263}
Note: See TracBrowser for help on using the repository browser.