source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1083

Revision 1083, 6.3 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7
8import org.apache.commons.lang.ArrayUtils;
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
12import org.qcg.broker.schemas.resreqs.Workflow;
13import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
14
15import dcworms.schedframe.scheduling.ExecTask;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.ExecutablesList;
19import schedframe.scheduling.tasks.JobInterface;
20import schedframe.scheduling.tasks.Task;
21
22public class JobRegistryImpl extends AbstractJobRegistry {
23       
24        private static final long serialVersionUID = 8030555906990767342L;
25
26        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
27
28        private String context;
29
30        //TO DO - consider data structure
31        protected static final ExecutablesList executables = new ExecutablesList();
32        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
33        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
34
35        public JobRegistryImpl(String context) {
36                this.context = context;
37        }
38
39        public boolean addExecTask(ExecTask newTask) {
40                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) {
41                        synchronized (executables)  {
42                                executables.add(newTask);
43                        }
44                        return true;
45                }
46                return false;
47        }
48
49        public ExecutablesList getExecutableTasks() {
50                return executables;
51        }
52        public List<ExecTask> getTasks(int status) {
53                List<ExecTask> taskList = new ArrayList<ExecTask>();
54                synchronized (executables)  {
55                        for (ExecTask task: executables) {
56                                if (task.getStatus() == status) {
57                                        List<String> visitedResource = task.getVisitedResources();
58                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
59                                                taskList.add(task);
60                                        }
61                                }
62                        }
63                }
64                return taskList;
65        }
66
67        public List<ExecTask> getQueuedTasks() {
68                return getTasks(DCWormsTags.QUEUED);
69        }
70       
71        public List<ExecTask> getRunningTasks() {
72                return getTasks(DCWormsTags.INEXEC);
73        }
74       
75        public List<ExecTask> getReadyTasks() {
76                return getTasks(DCWormsTags.READY);
77        }
78       
79        public List<ExecTask> getFinishedTasks() {
80                return getTasks(DCWormsTags.SUCCESS);
81        }
82       
83        public ExecTask getExecutable(String jobId, String taskId){
84                synchronized (executables)  {
85                        for (ExecTask task : executables) {
86                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
87                                        return task;
88                                }
89                        }
90                }
91                return null;
92        }
93
94        @SuppressWarnings("unchecked")
95        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) {
96                List<Task> availableTasks = new ArrayList<Task>();
97                List<Task> waitingTasks = new ArrayList<Task>();
98               
99                for(int i = 0; i < wuList.size(); i++){
100                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
101                        waitingTasks.addAll((List<Task>)wu.getTask());
102                }
103
104                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
105                return availableTasks;
106        }
107
108        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
109               
110                List<Task> availableTasks = new ArrayList<Task>();
111                int size = tasks.size();
112               
113                for(int i = 0; i < size; i++){
114                        int parCnt;
115                        int previousTaskSucceedCnt = 0;
116                        Task task = tasks.get(i);
117                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
118                                continue;
119                        //the following procedure supports only one nested structure
120                        Workflow w = task.getDescription().getWorkflow();
121                        if (w == null){
122                                availableTasks.add(task);
123                                continue;
124                        }
125                        if(w.getAnd() != null) {
126                                parCnt = w.getAnd().getParentOpTypeItemCount();
127                                if(parCnt == 0)
128                                {
129                                        availableTasks.add(task);
130                                }
131                                else
132                                {
133                                        for(int j = 0; j < parCnt; j++){
134                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
135                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
136                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
137                                                                break;
138                                                        }
139                                                }
140                                                previousTaskSucceedCnt++;
141                                        }
142
143                                        if(previousTaskSucceedCnt == parCnt)
144                                                availableTasks.add(task);
145                                }
146                        }
147                        else if(w.getOr() != null) {
148                                parCnt = w.getOr().getParentOpTypeItemCount();
149                                if(parCnt == 0)
150                                {
151                                        availableTasks.add(task);
152                                }
153                                else
154                                {
155                                        for(int j = 0; j < parCnt; j++){
156                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
157                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
158                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
159                                                                continue;
160                                                        }
161                                                }
162                                                previousTaskSucceedCnt++;
163                                        }
164
165                                        if(previousTaskSucceedCnt > 0)
166                                                availableTasks.add(task);
167                                }
168                        }
169                        else {
170                                parCnt = w.getParentCount();
171                                if(parCnt == 0)
172                                {
173                                        availableTasks.add(task);
174                                }
175                                else
176                                {
177                                        for(int j = 0; j < parCnt; j++){
178                                                ParentType par = w.getParent(j);
179                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
180                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
181                                                                continue;
182                                                        }
183                                                }
184                                                previousTaskSucceedCnt++;
185                                        }
186
187                                        if(previousTaskSucceedCnt == parCnt)
188                                                availableTasks.add(task);
189                                }
190                        }
191                       
192                        /*try{         
193                                parCnt = task.getDescription().getWorkflow().getParentCount();
194                        } catch(Exception e){
195                                parCnt = 0;
196                        }
197                        if(parCnt == 0){
198                                availableTasks.add(task);
199                        }
200                        else {
201                                for(int j = 0; j < parCnt; j++){
202                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
203                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
204                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
205                                                        break;
206                                                }
207                                        }
208                                        previousTaskSucceedCnt++;
209                                }
210                               
211                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
212                                        availableTasks.add(task);
213                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
214                                        availableTasks.add(task);
215                                else if (previousTaskSucceedCnt == parCnt)
216                                        availableTasks.add(task);
217                        }*/
218                }               
219                return availableTasks;
220        }
221       
222        private boolean checkTaskCompletion(String jobID, String taskID){
223                JobInterface<?> job = getJobInfo(jobID);
224                try {
225                        if(job.getTask(taskID).isFinished())
226                                return true;
227                } catch (NoSuchFieldException e) {
228                        //e.printStackTrace();
229                }
230                return false;
231        }
232
233}
Note: See TracBrowser for help on using the repository browser.