source: DCWoRMS/trunk/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1061

Revision 1061, 6.3 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
RevLine 
[477]1package schedframe.scheduling.manager.tasks;
2
[493]3import gridsim.dcworms.DCWormsTags;
[477]4
5import java.util.ArrayList;
6import java.util.List;
7
8import org.apache.commons.lang.ArrayUtils;
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
[1049]12import org.qcg.broker.schemas.resreqs.Workflow;
[477]13import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
14
[490]15import dcworms.schedframe.scheduling.ExecTask;
16
[477]17import qcg.shared.constants.BrokerConstants;
[481]18import schedframe.ExecutablesList;
[477]19import schedframe.scheduling.tasks.JobInterface;
20import schedframe.scheduling.tasks.Task;
21
22public class JobRegistryImpl extends AbstractJobRegistry {
23       
24        private static final long serialVersionUID = 8030555906990767342L;
25
26        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
27
28        private String context;
29
[481]30        //TO DO - consider data structure
31        protected static final ExecutablesList executables = new ExecutablesList();
32        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
33        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
[477]34
[481]35        public JobRegistryImpl(String context) {
36                this.context = context;
[477]37        }
38
[481]39        public boolean addExecTask(ExecTask newTask) {
40                if(getExecutable(newTask.getJobId(), newTask.getId()) == null) {
41                        synchronized (executables)  {
42                                executables.add(newTask);
[477]43                        }
44                        return true;
45                }
46                return false;
47        }
48
[481]49        public ExecutablesList getExecutableTasks() {
50                return executables;
51        }
[477]52        public List<ExecTask> getTasks(int status) {
53                List<ExecTask> taskList = new ArrayList<ExecTask>();
[481]54                synchronized (executables)  {
55                        for (ExecTask task: executables) {
[477]56                                if (task.getStatus() == status) {
57                                        List<String> visitedResource = task.getVisitedResources();
[481]58                                        if(ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), context)) {
[477]59                                                taskList.add(task);
60                                        }
61                                }
62                        }
63                }
64                return taskList;
65        }
66
67        public List<ExecTask> getQueuedTasks() {
[481]68                return getTasks(DCWormsTags.QUEUED);
[477]69        }
70       
71        public List<ExecTask> getRunningTasks() {
[481]72                return getTasks(DCWormsTags.INEXEC);
[477]73        }
74       
75        public List<ExecTask> getReadyTasks() {
[481]76                return getTasks(DCWormsTags.READY);
[477]77        }
78       
79        public List<ExecTask> getFinishedTasks() {
[481]80                return getTasks(DCWormsTags.SUCCESS);
[477]81        }
82       
[481]83        public ExecTask getExecutable(String jobId, String taskId){
84                synchronized (executables)  {
85                        for (ExecTask task : executables) {
[490]86                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
[477]87                                        return task;
88                                }
89                        }
90                }
91                return null;
92        }
93
94        @SuppressWarnings("unchecked")
[481]95        public List<Task> getAvailableTasks(List<JobInterface<?>> wuList) {
96                List<Task> availableTasks = new ArrayList<Task>();
[477]97                List<Task> waitingTasks = new ArrayList<Task>();
98               
99                for(int i = 0; i < wuList.size(); i++){
100                        JobInterface<?> wu  = (JobInterface<?>)wuList.get(i);
101                        waitingTasks.addAll((List<Task>)wu.getTask());
102                }
103
[481]104                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
105                return availableTasks;
[477]106        }
107
108
[481]109        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
[477]110               
[481]111                List<Task> availableTasks = new ArrayList<Task>();
112                int size = tasks.size();
[477]113               
114                for(int i = 0; i < size; i++){
115                        int parCnt;
[481]116                        int previousTaskSucceedCnt = 0;
[477]117                        Task task = tasks.get(i);
118                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
119                                continue;
[1049]120                        //the following procedure supports only one nested structure
121                        Workflow w = task.getDescription().getWorkflow();
122                        if (w == null){
123                                availableTasks.add(task);
124                                continue;
125                        }
126                        if(w.getAnd() != null) {
127                                parCnt = w.getAnd().getParentOpTypeItemCount();
128                                if(parCnt == 0)
129                                {
130                                        availableTasks.add(task);
131                                }
132                                else
133                                {
134                                        for(int j = 0; j < parCnt; j++){
135                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
136                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
137                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
138                                                                break;
139                                                        }
140                                                }
141                                                previousTaskSucceedCnt++;
142                                        }
143
144                                        if(previousTaskSucceedCnt == parCnt)
145                                                availableTasks.add(task);
146                                }
147                        }
148                        else if(w.getOr() != null) {
149                                parCnt = w.getOr().getParentOpTypeItemCount();
150                                if(parCnt == 0)
151                                {
152                                        availableTasks.add(task);
153                                }
154                                else
155                                {
156                                        for(int j = 0; j < parCnt; j++){
157                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
158                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
159                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
160                                                                continue;
161                                                        }
162                                                }
163                                                previousTaskSucceedCnt++;
164                                        }
165
166                                        if(previousTaskSucceedCnt > 0)
167                                                availableTasks.add(task);
168                                }
169                        }
170                        else {
171                                parCnt = w.getParentCount();
172                                if(parCnt == 0)
173                                {
174                                        availableTasks.add(task);
175                                }
176                                else
177                                {
178                                        for(int j = 0; j < parCnt; j++){
179                                                ParentType par = w.getParent(j);
180                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
181                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
182                                                                continue;
183                                                        }
184                                                }
185                                                previousTaskSucceedCnt++;
186                                        }
187
188                                        if(previousTaskSucceedCnt == parCnt)
189                                                availableTasks.add(task);
190                                }
191                        }
192                       
193                        /*try{         
[477]194                                parCnt = task.getDescription().getWorkflow().getParentCount();
195                        } catch(Exception e){
196                                parCnt = 0;
197                        }
[481]198                        if(parCnt == 0){
199                                availableTasks.add(task);
[477]200                        }
[481]201                        else {
[477]202                                for(int j = 0; j < parCnt; j++){
203                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
204                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
205                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
206                                                        break;
207                                                }
208                                        }
[481]209                                        previousTaskSucceedCnt++;
[477]210                                }
211                               
[481]212                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
213                                        availableTasks.add(task);
214                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
215                                        availableTasks.add(task);
216                                else if (previousTaskSucceedCnt == parCnt)
217                                        availableTasks.add(task);
[1049]218                        }*/
[477]219                }               
[481]220                return availableTasks;
[477]221        }
222       
223        private boolean checkTaskCompletion(String jobID, String taskID){
224                JobInterface<?> job = getJobInfo(jobID);
225                try {
226                        if(job.getTask(taskID).isFinished())
227                                return true;
228                } catch (NoSuchFieldException e) {
229                        //e.printStackTrace();
230                }
231                return false;
232        }
[481]233
[477]234}
Note: See TracBrowser for help on using the repository browser.