source: DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1049

Revision 1049, 6.3 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7
8import org.apache.commons.lang.ArrayUtils;
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
12import org.qcg.broker.schemas.resreqs.Workflow;
13import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
14
15import dcworms.schedframe.scheduling.ExecTask;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.ExecutablesList;
19import schedframe.scheduling.tasks.JobInterface;
20import schedframe.scheduling.tasks.Task;
21
22public class JobRegistryImpl extends AbstractJobRegistry {
23       
24        private static final long serialVersionUID = 8030555906990767342L;
25
26        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
27
28        private String context;
29
30        //TO DO - consider data structure
31        protected static final ExecutablesList executables = new ExecutablesList();
32        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
33        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
34
35        public JobRegistryImpl(String context) {
36                this.context = context;
37        }
38
39        public boolean addExecTask(ExecTask newTask) {
40                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) {
41                        synchronized (executables)  {
42                                executables.add(newTask);
43                        }
44                        return true;
45                }
46                return false;
47        }
48
49        public ExecutablesList getExecutableTasks() {
50                return executables;
51        }
52        public List<ExecTask> getTasks(int status) {
53                List<ExecTask> taskList = new ArrayList<ExecTask>();
54                synchronized (executables)  {
55                        for (ExecTask task: executables) {
56                                if (task.getStatus() == status) {
57                                        List<String> visitedResource = task.getVisitedResources();
58                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
59                                                taskList.add(task);
60                                        }
61                                }
62                        }
63                }
64                return taskList;
65        }
66
67        public List<ExecTask> getQueuedTasks() {
68                return getTasks(DCWormsTags.QUEUED);
69        }
70       
71        public List<ExecTask> getRunningTasks() {
72                return getTasks(DCWormsTags.INEXEC);
73        }
74       
75        public List<ExecTask> getReadyTasks() {
76                return getTasks(DCWormsTags.READY);
77        }
78       
79        public List<ExecTask> getFinishedTasks() {
80                return getTasks(DCWormsTags.SUCCESS);
81        }
82       
83        public ExecTask getExecutable(String jobId, String taskId){
84                synchronized (executables)  {
85                        for (ExecTask task : executables) {
86                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
87                                        return task;
88                                }
89                        }
90                }
91                return null;
92        }
93
94        @SuppressWarnings("unchecked")
95        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) {
96                List<Task> availableTasks = new ArrayList<Task>();
97                List<Task> waitingTasks = new ArrayList<Task>();
98               
99                for(int i = 0; i < wuList.size(); i++){
100                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
101                        waitingTasks.addAll((List<Task>)wu.getTask());
102                }
103
104                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
105                return availableTasks;
106        }
107
108
109        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
110               
111                List<Task> availableTasks = new ArrayList<Task>();
112                int size = tasks.size();
113               
114                for(int i = 0; i < size; i++){
115                        int parCnt;
116                        int previousTaskSucceedCnt = 0;
117                        Task task = tasks.get(i);
118                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
119                                continue;
120                        //the following procedure supports only one nested structure
121                        Workflow w = task.getDescription().getWorkflow();
122                        if (w == null){
123                                availableTasks.add(task);
124                                continue;
125                        }
126                        if(w.getAnd() != null) {
127                                parCnt = w.getAnd().getParentOpTypeItemCount();
128                                if(parCnt == 0)
129                                {
130                                        availableTasks.add(task);
131                                }
132                                else
133                                {
134                                        for(int j = 0; j < parCnt; j++){
135                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
136                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
137                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
138                                                                break;
139                                                        }
140                                                }
141                                                previousTaskSucceedCnt++;
142                                        }
143
144                                        if(previousTaskSucceedCnt == parCnt)
145                                                availableTasks.add(task);
146                                }
147                        }
148                        else if(w.getOr() != null) {
149                                parCnt = w.getOr().getParentOpTypeItemCount();
150                                if(parCnt == 0)
151                                {
152                                        availableTasks.add(task);
153                                }
154                                else
155                                {
156                                        for(int j = 0; j < parCnt; j++){
157                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
158                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
159                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
160                                                                continue;
161                                                        }
162                                                }
163                                                previousTaskSucceedCnt++;
164                                        }
165
166                                        if(previousTaskSucceedCnt > 0)
167                                                availableTasks.add(task);
168                                }
169                        }
170                       
171                        else {
172                                parCnt = w.getParentCount();
173                                if(parCnt == 0)
174                                {
175                                        availableTasks.add(task);
176                                }
177                                else
178                                {
179                                        for(int j = 0; j < parCnt; j++){
180                                                ParentType par = w.getParent(j);
181                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
182                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
183                                                                continue;
184                                                        }
185                                                }
186                                                previousTaskSucceedCnt++;
187                                        }
188
189                                        if(previousTaskSucceedCnt == parCnt)
190                                                availableTasks.add(task);
191                                }
192                        }
193                       
194                        /*try{         
195                                parCnt = task.getDescription().getWorkflow().getParentCount();
196                        } catch(Exception e){
197                                parCnt = 0;
198                        }
199                        if(parCnt == 0){
200                                availableTasks.add(task);
201                        }
202                        else {
203                                for(int j = 0; j < parCnt; j++){
204                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
205                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
206                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
207                                                        break;
208                                                }
209                                        }
210                                        previousTaskSucceedCnt++;
211                                }
212                               
213                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
214                                        availableTasks.add(task);
215                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
216                                        availableTasks.add(task);
217                                else if (previousTaskSucceedCnt == parCnt)
218                                        availableTasks.add(task);
219                        }*/
220                }               
221                return availableTasks;
222        }
223       
224        private boolean checkTaskCompletion(String jobID, String taskID){
225                JobInterface<?> job = getJobInfo(jobID);
226                try {
227                        if(job.getTask(taskID).isFinished())
228                                return true;
229                } catch (NoSuchFieldException e) {
230                        //e.printStackTrace();
231                }
232                return false;
233        }
234
235}
Note: See TracBrowser for help on using the repository browser.