source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1452

Revision 1452, 7.1 KB checked in by wojtekp, 10 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Set;
8
9import org.qcg.broker.schemas.resreqs.ParentType;
10import org.qcg.broker.schemas.resreqs.Workflow;
11import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
12
13import qcg.shared.constants.BrokerConstants;
14import schedframe.ExecutablesList;
15import schedframe.resources.computing.ComputingResource;
16import schedframe.scheduling.tasks.JobInterface;
17import schedframe.scheduling.tasks.Task;
18import dcworms.schedframe.scheduling.ExecTask;
19
20public class JobRegistryImpl extends AbstractJobRegistry {
21
22        private String context;
23        private ComputingResource cr;
24        //TO DO - consider data structure
25        protected static final ExecutablesList executables = new ExecutablesList();
26
27        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
28        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
29
30        public JobRegistryImpl(String context) {
31                this.context = context;
32        }
33       
34        public JobRegistryImpl(ComputingResource cr) {
35                this.cr = cr;
36        }
37
38        public JobRegistryImpl() {
39                this("");
40        }
41       
42        public boolean addExecTask(ExecTask newTask) {
43                if(getTask(newTask.getJobId(), newTask.getId()) == null) {
44                        synchronized (executables) {
45                                executables.add(newTask);
46                        }
47                        return true;
48                }
49                return false;
50        }
51
52        public ExecutablesList getTasks() {
53                return executables;
54        }
55        public List<ExecTask> getTasks(int status) {
56                List<ExecTask> taskList = new ArrayList<ExecTask>();
57                synchronized (executables)  {
58                        for (ExecTask task: executables) {
59                                if (task.getStatus() == status) {
60                                        if(cr != null){
61                                                if(!task.getAllocatedResources().isEmpty()){
62                                                        Set<ComputingResource> visitedResource = task.getAllocatedResources().getLast().getResources();
63                                                        if(visitedResource.contains(cr)){
64                                                                taskList.add(task);
65                                                        } else {
66                                                                for(ComputingResource res: visitedResource){
67                                                                        if(cr.contains(res)){
68                                                                                taskList.add(task);
69                                                                                break;
70                                                                        }
71                                                                }
72                                                        }
73                                                }
74                                        } else {
75                                                if(!task.getAllocatedResources().isEmpty()){
76                                                        Set<String> visitedResource = task.getAllocatedResources().getLast().getResourceNames();
77                                                        for(String res: visitedResource){
78                                                                if(res.equals(context) || res.substring(0, res.lastIndexOf("/")).contains(context)){
79                                                                        taskList.add(task);
80                                                                        break;
81                                                                }
82                                                        }       
83                                                }
84
85                                                if(task.getSchedulerName().equals(context)) {
86                                                        taskList.add(task);
87                                                }
88                                        }
89                                }
90                        }
91                }
92
93                return taskList;
94        }
95
96        public List<ExecTask> getQueuedTasks() {
97                return getTasks(DCWormsTags.QUEUED);
98        }
99       
100        public List<ExecTask> getRunningTasks() {
101                return getTasks(DCWormsTags.INEXEC);
102        }
103       
104        public List<ExecTask> getReadyTasks() {
105                return getTasks(DCWormsTags.READY);
106        }
107       
108        public List<ExecTask> getFinishedTasks() {
109                return getTasks(DCWormsTags.SUCCESS);
110        }
111       
112        public ExecTask getTask(String jobId, String taskId){
113                synchronized (executables)  {
114                        for (ExecTask task : executables) {
115                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
116                                        return task;
117                                }
118                        }
119                }
120                return null;
121        }
122
123        @SuppressWarnings("unchecked")
124        public List<Task> getAvailableTasks(List<JobInterface<?>> jobList) {
125                List<Task> availableTasks = new ArrayList<Task>();
126                List<Task> waitingTasks = new ArrayList<Task>();
127               
128                for(int i = 0; i < jobList.size(); i++){
129                        JobInterface<?> job = (JobInterface<?>)jobList.get(i);
130                        waitingTasks.addAll((List<Task>)job.getTask());
131                }
132
133                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
134                return availableTasks;
135        }
136
137        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
138               
139                List<Task> availableTasks = new ArrayList<Task>();
140                int size = tasks.size();
141               
142                for(int i = 0; i < size; i++){
143                        int parCnt;
144                        int previousTaskSucceedCnt = 0;
145                        Task task = tasks.get(i);
146                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
147                                continue;
148                        //the following procedure supports only one nested structure
149                        Workflow w = task.getDescription().getWorkflow();
150                        if (w == null){
151                                availableTasks.add(task);
152                                continue;
153                        }
154                        if(w.getAnd() != null) {
155                                parCnt = w.getAnd().getParentOpTypeItemCount();
156                                if(parCnt == 0)
157                                {
158                                        availableTasks.add(task);
159                                }
160                                else
161                                {
162                                        for(int j = 0; j < parCnt; j++){
163                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
164                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
165                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
166                                                                break;
167                                                        }
168                                                }
169                                                previousTaskSucceedCnt++;
170                                        }
171
172                                        if(previousTaskSucceedCnt == parCnt)
173                                                availableTasks.add(task);
174                                }
175                        }
176                        else if(w.getOr() != null) {
177                                parCnt = w.getOr().getParentOpTypeItemCount();
178                                if(parCnt == 0)
179                                {
180                                        availableTasks.add(task);
181                                }
182                                else
183                                {
184                                        for(int j = 0; j < parCnt; j++){
185                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
186                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
187                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
188                                                                continue;
189                                                        }
190                                                }
191                                                previousTaskSucceedCnt++;
192                                        }
193
194                                        if(previousTaskSucceedCnt > 0)
195                                                availableTasks.add(task);
196                                }
197                        }
198                        else {
199                                parCnt = w.getParentCount();
200                                if(parCnt == 0)
201                                {
202                                        availableTasks.add(task);
203                                }
204                                else
205                                {
206                                        for(int j = 0; j < parCnt; j++){
207                                                ParentType par = w.getParent(j);
208                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
209                                                        if(par.getContent().contains("_")){
210                                                                if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){
211                                                                        continue;
212                                                                }
213                                                        }
214                                                        else if(!checkTaskCompletion(task.getJobId(), par.getContent())){
215                                                                continue;
216                                                        }
217                                                }
218                                                previousTaskSucceedCnt++;
219                                        }
220
221                                        if(previousTaskSucceedCnt == parCnt)
222                                                availableTasks.add(task);
223                                }
224                        }
225                       
226                        /*try{         
227                                parCnt = task.getDescription().getWorkflow().getParentCount();
228                        } catch(Exception e){
229                                parCnt = 0;
230                        }
231                        if(parCnt == 0){
232                                availableTasks.add(task);
233                        }
234                        else {
235                                for(int j = 0; j < parCnt; j++){
236                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
237                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
238                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
239                                                        break;
240                                                }
241                                        }
242                                        previousTaskSucceedCnt++;
243                                }
244                               
245                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
246                                        availableTasks.add(task);
247                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
248                                        availableTasks.add(task);
249                                else if (previousTaskSucceedCnt == parCnt)
250                                        availableTasks.add(task);
251                        }*/
252                }               
253                return availableTasks;
254        }
255       
256        private boolean checkTaskCompletion(String jobID, String taskID){
257                JobInterface<?> job = getJobInfo(jobID);
258                try {
259                        if(job.getTask(taskID).isFinished())
260                                return true;
261                } catch (NoSuchFieldException e) {
262                        //e.printStackTrace();
263                }
264                return false;
265        }
266       
267        public static void reset(){
268                jobs.clear();
269                executables.clear();
270        }
271
272}
Note: See TracBrowser for help on using the repository browser.