source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/manager/tasks/JobRegistryImpl.java @ 1362

Revision 1362, 6.5 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.manager.tasks;
2
3import gridsim.dcworms.DCWormsTags;
4
5import java.util.ArrayList;
6import java.util.List;
7import java.util.Set;
8
9import org.apache.commons.logging.Log;
10import org.apache.commons.logging.LogFactory;
11import org.qcg.broker.schemas.resreqs.ParentType;
12import org.qcg.broker.schemas.resreqs.Workflow;
13import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
14
15import dcworms.schedframe.scheduling.ExecTask;
16
17import qcg.shared.constants.BrokerConstants;
18import schedframe.ExecutablesList;
19import schedframe.scheduling.tasks.JobInterface;
20import schedframe.scheduling.tasks.Task;
21
22public class JobRegistryImpl extends AbstractJobRegistry {
23
24        private static Log log = LogFactory.getLog(JobRegistryImpl.class);
25
26        private String context;
27
28        //TO DO - consider data structure
29        protected static final ExecutablesList executables = new ExecutablesList();
30        //protected static final List<ExecTask> executables = Collections.synchronizedList(new ArrayList<ExecTask>());;
31        //protected static final List<ExecTaskInterface> executables = new CopyOnWriteArrayList<ExecTaskInterface>();
32
33        public JobRegistryImpl(String context) {
34                this.context = context;
35        }
36
37        public boolean addExecTask(ExecTask newTask) {
38                if(getTask(newTask.getJobId(), newTask.getId()) == null) {
39                        synchronized (executables)  {
40                                executables.add(newTask);
41                        }
42                        return true;
43                }
44                return false;
45        }
46
47        public ExecutablesList getTasks() {
48                return executables;
49        }
50        public List<ExecTask> getTasks(int status) {
51                List<ExecTask> taskList = new ArrayList<ExecTask>();
52                synchronized (executables)  {
53                        for (ExecTask task: executables) {
54                                if (task.getStatus() == status) {
55                                        Set<String> visitedResource = task.getAllocatedResources().getLast().getVisitedResources();
56                                        for(String res: visitedResource){
57                                                if(res.contains(context)){
58                                                        taskList.add(task);
59                                                        break;
60                                                }
61                                        }
62                                        if(task.getSchedulerName().contains(context)) {
63                                                taskList.add(task);
64                                        }
65                                }
66                        }
67                }
68                return taskList;
69        }
70
71        public List<ExecTask> getQueuedTasks() {
72                return getTasks(DCWormsTags.QUEUED);
73        }
74       
75        public List<ExecTask> getRunningTasks() {
76                return getTasks(DCWormsTags.INEXEC);
77        }
78       
79        public List<ExecTask> getReadyTasks() {
80                return getTasks(DCWormsTags.READY);
81        }
82       
83        public List<ExecTask> getFinishedTasks() {
84                return getTasks(DCWormsTags.SUCCESS);
85        }
86       
87        public ExecTask getTask(String jobId, String taskId){
88                synchronized (executables)  {
89                        for (ExecTask task : executables) {
90                                if (task.getJobId().compareTo(jobId) == 0 && task.getId().compareTo(taskId) == 0) {
91                                        return task;
92                                }
93                        }
94                }
95                return null;
96        }
97
98        @SuppressWarnings("unchecked")
99        public List<Task> getAvailableTasks(List<JobInterface<?>> jobList) {
100                List<Task> availableTasks = new ArrayList<Task>();
101                List<Task> waitingTasks = new ArrayList<Task>();
102               
103                for(int i = 0; i < jobList.size(); i++){
104                        JobInterface<?> job = (JobInterface<?>)jobList.get(i);
105                        waitingTasks.addAll((List<Task>)job.getTask());
106                }
107
108                availableTasks.addAll(getPrecedenceConstrainedAvailableTasks(waitingTasks));
109                return availableTasks;
110        }
111
112        private List<Task> getPrecedenceConstrainedAvailableTasks(List<Task> tasks){
113               
114                List<Task> availableTasks = new ArrayList<Task>();
115                int size = tasks.size();
116               
117                for(int i = 0; i < size; i++){
118                        int parCnt;
119                        int previousTaskSucceedCnt = 0;
120                        Task task = tasks.get(i);
121                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
122                                continue;
123                        //the following procedure supports only one nested structure
124                        Workflow w = task.getDescription().getWorkflow();
125                        if (w == null){
126                                availableTasks.add(task);
127                                continue;
128                        }
129                        if(w.getAnd() != null) {
130                                parCnt = w.getAnd().getParentOpTypeItemCount();
131                                if(parCnt == 0)
132                                {
133                                        availableTasks.add(task);
134                                }
135                                else
136                                {
137                                        for(int j = 0; j < parCnt; j++){
138                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
139                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
140                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
141                                                                break;
142                                                        }
143                                                }
144                                                previousTaskSucceedCnt++;
145                                        }
146
147                                        if(previousTaskSucceedCnt == parCnt)
148                                                availableTasks.add(task);
149                                }
150                        }
151                        else if(w.getOr() != null) {
152                                parCnt = w.getOr().getParentOpTypeItemCount();
153                                if(parCnt == 0)
154                                {
155                                        availableTasks.add(task);
156                                }
157                                else
158                                {
159                                        for(int j = 0; j < parCnt; j++){
160                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
161                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
162                                                        if(!checkTaskCompletion(task.getJobId(), par.getContent())){
163                                                                continue;
164                                                        }
165                                                }
166                                                previousTaskSucceedCnt++;
167                                        }
168
169                                        if(previousTaskSucceedCnt > 0)
170                                                availableTasks.add(task);
171                                }
172                        }
173                        else {
174                                parCnt = w.getParentCount();
175                                if(parCnt == 0)
176                                {
177                                        availableTasks.add(task);
178                                }
179                                else
180                                {
181                                        for(int j = 0; j < parCnt; j++){
182                                                ParentType par = w.getParent(j);
183                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
184                                                        if(par.getContent().contains("_")){
185                                                                if(!checkTaskCompletion(par.getContent().split("_")[0], par.getContent().split("_")[1])){
186                                                                        continue;
187                                                                }
188                                                        }
189                                                        else if(!checkTaskCompletion(task.getJobId(), par.getContent())){
190                                                                continue;
191                                                        }
192                                                }
193                                                previousTaskSucceedCnt++;
194                                        }
195
196                                        if(previousTaskSucceedCnt == parCnt)
197                                                availableTasks.add(task);
198                                }
199                        }
200                       
201                        /*try{         
202                                parCnt = task.getDescription().getWorkflow().getParentCount();
203                        } catch(Exception e){
204                                parCnt = 0;
205                        }
206                        if(parCnt == 0){
207                                availableTasks.add(task);
208                        }
209                        else {
210                                for(int j = 0; j < parCnt; j++){
211                                        ParentType par = task.getDescription().getWorkflow().getParent(j);
212                                        if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
213                                                if(!checkTaskCompletion(task.getJobId(), par.getContent())){
214                                                        break;
215                                                }
216                                        }
217                                        previousTaskSucceedCnt++;
218                                }
219                               
220                                if(previousTaskSucceedCnt == parCnt && task.getDescription().getWorkflow().getAnd() != null)
221                                        availableTasks.add(task);
222                                else if(previousTaskSucceedCnt > 0 && task.getDescription().getWorkflow().getOr() != null)
223                                        availableTasks.add(task);
224                                else if (previousTaskSucceedCnt == parCnt)
225                                        availableTasks.add(task);
226                        }*/
227                }               
228                return availableTasks;
229        }
230       
231        private boolean checkTaskCompletion(String jobID, String taskID){
232                JobInterface<?> job = getJobInfo(jobID);
233                try {
234                        if(job.getTask(taskID).isFinished())
235                                return true;
236                } catch (NoSuchFieldException e) {
237                        //e.printStackTrace();
238                }
239                return false;
240        }
241
242}
Note: See TracBrowser for help on using the repository browser.