source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1151

Revision 1151, 6.4 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7
8import org.apache.commons.lang.ArrayUtils;
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
12import org.qcg.broker.schemas.resreqs.Workflow;
13import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
14
15import dcworms.schedframe.scheduling.ExecTask;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.ExecutablesList;
19import schedframe.scheduling.tasks.JobInterface;
20import schedframe.scheduling.tasks.Task;
21
22public class JobRegistryImpl extends AbstractJobRegistry {
23       
24        private static final long serialVersionUID = 8030555906990767342L;
25
26        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
27
28        private String context;
29
30        //TO DO - consider data structure
31        protected static final ExecutablesList executables = new ExecutablesList();
32        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
33        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
34
35        public JobRegistryImpl(String context) {
36                this.context = context;
37        }
38
39        public boolean addExecTask(ExecTask newTask) {
40                if(getTask(newTask.getJobId(), newTask.getId()) == null) {
41                        synchronized (executables)  {
42                                executables.add(newTask);
43                        }
44                        return true;
45                }
46                return false;
47        }
48
49        public ExecutablesList getTasks() {
50                return executables;
51        }
52        public List<ExecTask> getTasks(int status) {
53                List<ExecTask> taskList = new ArrayList<ExecTask>();
54                synchronized (executables)  {
55                        for (ExecTask task: executables) {
56                                if (task.getStatus() == status) {
57                                        List<String> visitedResource = task.getVisitedResources();
58                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
59                                                taskList.add(task);
60                                        }
61                                }
62                        }
63                }
64                return taskList;
65        }
66
67        public List<ExecTask> getQueuedTasks() {
68                return getTasks(DCWormsTags.QUEUED);
69        }
70       
71        public List<ExecTask> getRunningTasks() {
72                return getTasks(DCWormsTags.INEXEC);
73        }
74       
75        public List<ExecTask> getReadyTasks() {
76                return getTasks(DCWormsTags.READY);
77        }
78       
79        public List<ExecTask> getFinishedTasks() {
80                return getTasks(DCWormsTags.SUCCESS);
81        }
82       
83        public ExecTask getTask(String jobId, String taskId){
84                synchronized (executables)  {
85                        for (ExecTask task : executables) {
86                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
87                                        return task;
88                                }
89                        }
90                }
91                return null;
92        }
93
94        @SuppressWarnings("unchecked")
95        public List<Task> getAvailableTasks(List<JobInterface<?>> jobList) {
96                List<Task> availableTasks = new ArrayList<Task>();
97                List<Task> waitingTasks = new ArrayList<Task>();
98               
99                for(int i = 0; i < jobList.size(); i++){
100                        JobInterface<?> job = (JobInterface<?>)jobList.get(i);
101                        waitingTasks.addAll((List<Task>)job.getTask());
102                }
103
104                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
105                return availableTasks;
106        }
107
108        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
109               
110                List<Task> availableTasks = new ArrayList<Task>();
111                int size = tasks.size();
112               
113                for(int i = 0; i < size; i++){
114                        int parCnt;
115                        int previousTaskSucceedCnt = 0;
116                        Task task = tasks.get(i);
117                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
118                                continue;
119                        //the following procedure supports only one nested structure
120                        Workflow w = task.getDescription().getWorkflow();
121                        if (w == null){
122                                availableTasks.add(task);
123                                continue;
124                        }
125                        if(w.getAnd() != null) {
126                                parCnt = w.getAnd().getParentOpTypeItemCount();
127                                if(parCnt == 0)
128                                {
129                                        availableTasks.add(task);
130                                }
131                                else
132                                {
133                                        for(int j = 0; j < parCnt; j++){
134                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
135                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
136                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
137                                                                break;
138                                                        }
139                                                }
140                                                previousTaskSucceedCnt++;
141                                        }
142
143                                        if(previousTaskSucceedCnt == parCnt)
144                                                availableTasks.add(task);
145                                }
146                        }
147                        else if(w.getOr() != null) {
148                                parCnt = w.getOr().getParentOpTypeItemCount();
149                                if(parCnt == 0)
150                                {
151                                        availableTasks.add(task);
152                                }
153                                else
154                                {
155                                        for(int j = 0; j < parCnt; j++){
156                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
157                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
158                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
159                                                                continue;
160                                                        }
161                                                }
162                                                previousTaskSucceedCnt++;
163                                        }
164
165                                        if(previousTaskSucceedCnt > 0)
166                                                availableTasks.add(task);
167                                }
168                        }
169                        else {
170                                parCnt = w.getParentCount();
171                                if(parCnt == 0)
172                                {
173                                        availableTasks.add(task);
174                                }
175                                else
176                                {
177                                        for(int j = 0; j < parCnt; j++){
178                                                ParentType par = w.getParent(j);
179                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
180                                                        if(par.getContent().contains("_")){
181                                                                if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){
182                                                                        continue;
183                                                                }
184                                                        }
185                                                        else if(!checkTaskCompletion(task.getJobId(), par.getContent())){
186                                                                continue;
187                                                        }
188                                                }
189                                                previousTaskSucceedCnt++;
190                                        }
191
192                                        if(previousTaskSucceedCnt == parCnt)
193                                                availableTasks.add(task);
194                                }
195                        }
196                       
197                        /*try{         
198                                parCnt = task.getDescription().getWorkflow().getParentCount();
199                        } catch(Exception e){
200                                parCnt = 0;
201                        }
202                        if(parCnt == 0){
203                                availableTasks.add(task);
204                        }
205                        else {
206                                for(int j = 0; j < parCnt; j++){
207                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
208                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
209                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
210                                                        break;
211                                                }
212                                        }
213                                        previousTaskSucceedCnt++;
214                                }
215                               
216                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
217                                        availableTasks.add(task);
218                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
219                                        availableTasks.add(task);
220                                else if (previousTaskSucceedCnt == parCnt)
221                                        availableTasks.add(task);
222                        }*/
223                }               
224                return availableTasks;
225        }
226       
227        private boolean checkTaskCompletion(String jobID, String taskID){
228                JobInterface<?> job = getJobInfo(jobID);
229                try {
230                        if(job.getTask(taskID).isFinished())
231                                return true;
232                } catch (NoSuchFieldException e) {
233                        //e.printStackTrace();
234                }
235                return false;
236        }
237
238}
Note: See TracBrowser for help on using the repository browser.