source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/tasks/Job.java @ 1207

Revision 1207, 6.9 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.tasks;
2
3
4import org.qcg.broker.schemas.resreqs.ParentType;
5import org.qcg.broker.schemas.resreqs.ResourceRequirements;
6import org.qcg.broker.schemas.resreqs.Workflow;
7import org.qcg.broker.schemas.resreqs.types.TaskStatesName;
8
9
10import java.io.StringWriter;
11import java.io.Writer;
12import java.util.ArrayList;
13import java.util.Iterator;
14import java.util.List;
15
16import org.joda.time.DateTime;
17
18import qcg.shared.constants.BrokerConstants;
19
20import schedframe.scheduling.WorkloadUnitHandler;
21import schedframe.scheduling.manager.tasks.JobRegistryImpl;
22
23
24/**
25 *
26 * @author Marcin Krystek
27 *
28 */
29public class Job implements JobInterface<ResourceRequirements> {
30
31        protected List<Task> tasks;
32        protected int senderId;
33        protected boolean isRegistered;
34       
35        private int status;
36       
37        public Job(String id){
38                tasks = new ArrayList<Task>();
39        }
40       
41        public Job(ResourceRequirements resourceRequirements) throws Exception{
42                org.qcg.broker.schemas.resreqs.Task task[] = resourceRequirements.getTask();
43                if(task == null || task.length == 0)
44                        throw new NoSuchFieldException("No tasks are defined for job.");
45               
46                this.tasks = new ArrayList<Task>();
47               
48                for(int i = 0; i < task.length; i++){
49                        this.tasks.add(new Task(task[i]));
50                }
51               
52                isRegistered = true;
53        }
54
55        public void add(Task task) {
56                this.tasks.add(task);
57        }
58       
59        public String getId() {
60                if(this.tasks.size() == 0)
61                        throw new RuntimeException("No tasks are defined for job, so it is not possible to obtain job id.");
62               
63                return this.tasks.get(0).getJobId();
64        }
65       
66        public List<Task> getTask() {
67                return this.tasks;
68        }
69       
70        public Task getTask(String taskId) throws NoSuchFieldException {
71                if(taskId == null)
72                        throw new IllegalArgumentException("TaskId can not be null. Specify appropriate taskId.");
73               
74                if(this.tasks == null || this.tasks.size() == 0)
75                        throw new NoSuchFieldException("No tasks are defined for job.");
76               
77                Task retTask = null;
78               
79                Iterator<Task> itr = this.tasks.iterator();
80                while(itr.hasNext() && retTask == null){
81                        Task task = itr.next();
82                        if(taskId.equals(task.getId())){
83                                retTask = task;
84                        }
85                }
86
87                if(retTask == null)
88                        throw new NoSuchFieldException("Task "+taskId + " is not available in job " + getId());
89               
90                return retTask;
91        }
92       
93        public int getTaskCount() {
94                return this.tasks.size();
95        }
96
97       
98        public ResourceRequirements getDescription() {
99               
100                ResourceRequirements resReq = new ResourceRequirements();
101                if(this.tasks == null)
102                        return resReq;
103               
104                Iterator<Task> itr = this.tasks.iterator();
105               
106                while(itr.hasNext()){
107                        Task task = (Task) itr.next();
108                        resReq.addTask(task.getDescription());
109                }
110               
111                return resReq;
112        }
113
114        public String getDocument() throws Exception {
115                ResourceRequirements resReq = getDescription();
116                Writer writer = new StringWriter();
117               
118                resReq.marshal(writer);
119               
120                return writer.toString();
121        }
122
123        public boolean isFinished(){
124               
125                for(int i = 0; i < tasks.size(); i++){
126                        //if(tasks.get(i).getStatus() != BrokerConstants.TASK_STATUS_FINISHED)
127                        //      return false;
128                        if(!tasks.get(i).isFinished())
129                                return false;
130                }
131                return true;
132        }
133       
134        public DateTime getSubmissionTimeToBroker(){
135                return tasks.get(0).getSubmissionTimeToBroker();
136        }
137       
138        public int getStatus(){
139                boolean isForAll = true;
140                int baseStatus = tasks.get(0).getStatus();
141               
142                for(int i = 1; i < tasks.size() && isForAll; i++){
143                        Task t = tasks.get(i);
144                        isForAll = (t.getStatus() == baseStatus);
145                        switch(t.getStatus()){
146                                case (int) BrokerConstants.TASK_STATUS_RUNNING:{
147                                        return (int)BrokerConstants.JOB_STATUS_ACTIVE;
148                                }
149                                case (int) BrokerConstants.TASK_STATUS_QUEUED:{
150                                        return (int)BrokerConstants.JOB_STATUS_ACTIVE;
151                                }
152                        }
153                }
154               
155                if(isForAll && baseStatus == BrokerConstants.TASK_STATUS_FINISHED){
156                        return (int)BrokerConstants.JOB_STATUS_FINISHED;
157                } else if( baseStatus == BrokerConstants.TASK_STATUS_RUNNING){
158                        return (int)BrokerConstants.JOB_STATUS_ACTIVE;
159                } else if( baseStatus == BrokerConstants.TASK_STATUS_QUEUED){
160                        return (int)BrokerConstants.JOB_STATUS_ACTIVE;
161                }
162               
163                return status;
164        }
165       
166        public boolean setStatus(String taskId, int status){
167                boolean found = false;
168                for(int i = 0; i < tasks.size() && !found; i++){
169                        Task t = tasks.get(i);
170                        if(taskId.equals(t.getId())){
171                                try {
172                                        t.setStatus(status);
173                                } catch (Exception e) {
174                                        // TODO Auto-generated catch block
175                                        e.printStackTrace();
176                                }
177                                found = true;
178                        }
179                }
180                return found;
181        }
182       
183        public void setSenderId(int id){
184                this.senderId = id;
185        }
186       
187        public int getSenderId(){
188                return this.senderId;
189        }
190       
191        public int getUserId(){
192                return this.senderId;
193        }
194
195        public void setStatus(int status){
196                this.status = status;
197        }
198
199        public boolean isRegistered() {
200                return isRegistered;
201        }
202
203        public void register(JobRegistryImpl jobRegistry) {
204                isRegistered = jobRegistry.addJob(this);
205        }
206       
207        @Override
208        public void accept(WorkloadUnitHandler wuh) {
209                wuh.handleJob(this);
210        }
211       
212        private List<Task> getReadyTasks() throws NoSuchFieldException{
213               
214                List<Task> availableTasks = new ArrayList<Task>();
215                int size = tasks.size();
216               
217                for(int i = 0; i < size; i++){
218                        int parCnt;
219                        int previousTaskSucceedCnt = 0;
220                        Task task = tasks.get(i);
221                        if(task.getStatus() != (int)BrokerConstants.TASK_STATUS_UNSUBMITTED)
222                                continue;
223                        //the following procedure supports only one nested structure
224                        Workflow w = task.getDescription().getWorkflow();
225                        if (w == null){
226                                availableTasks.add(task);
227                                continue;
228                        }
229                        if(w.getAnd() != null) {
230                                parCnt = w.getAnd().getParentOpTypeItemCount();
231                                if(parCnt == 0)
232                                {
233                                        availableTasks.add(task);
234                                }
235                                else
236                                {
237                                        for(int j = 0; j < parCnt; j++){
238                                                ParentType par = w.getAnd().getParentOpTypeItem(j).getParent();
239                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
240                                                        if(!getTask(par.getContent()).isFinished()){
241                                                                break;
242                                                        }
243                                                }
244                                                previousTaskSucceedCnt++;
245                                        }
246
247                                        if(previousTaskSucceedCnt == parCnt)
248                                                availableTasks.add(task);
249                                }
250                        }
251                        else if(w.getOr() != null) {
252                                parCnt = w.getOr().getParentOpTypeItemCount();
253                                if(parCnt == 0)
254                                {
255                                        availableTasks.add(task);
256                                }
257                                else
258                                {
259                                        for(int j = 0; j < parCnt; j++){
260                                                ParentType par = w.getOr().getParentOpTypeItem(j).getParent();
261                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
262                                                        if(!getTask(par.getContent()).isFinished()){
263                                                                continue;
264                                                        }
265                                                }
266                                                previousTaskSucceedCnt++;
267                                        }
268
269                                        if(previousTaskSucceedCnt > 0)
270                                                availableTasks.add(task);
271                                }
272                        }
273                        else {
274                                parCnt = w.getParentCount();
275                                if(parCnt == 0)
276                                {
277                                        availableTasks.add(task);
278                                }
279                                else
280                                {
281                                        for(int j = 0; j < parCnt; j++){
282                                                ParentType par = w.getParent(j);
283                                                if(par.getTriggerState().compareTo(TaskStatesName.FINISHED) == 0){
284                                                        if(!getTask(par.getContent()).isFinished()){
285                                                                continue;
286                                                        }
287                                                }
288                                                previousTaskSucceedCnt++;
289                                        }
290
291                                        if(previousTaskSucceedCnt == parCnt)
292                                                availableTasks.add(task);
293                                }
294                        }
295                }               
296                return availableTasks;
297        }
298       
299}
Note: See TracBrowser for help on using the repository browser.