source: DCWoRMS/trunk/src/schedframe/scheduling/tasks/Task.java @ 477

Revision 477, 14.3 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.tasks;
2
3import org.qcg.broker.schemas.resreqs.ComputingResource;
4import org.qcg.broker.schemas.resreqs.ComputingResourceBaseTypeItem;
5import org.qcg.broker.schemas.resreqs.ComputingResourceExtType;
6import org.qcg.broker.schemas.resreqs.ComputingResourceParameterType;
7import org.qcg.broker.schemas.resreqs.ExecutionTimeType;
8import org.qcg.broker.schemas.resreqs.ProcessesResourceRequirements;
9import org.qcg.broker.schemas.resreqs.Requirements;
10import org.qcg.broker.schemas.resreqs.TaskResourceRequirements;
11import org.qcg.broker.schemas.resreqs.TimePeriod;
12import org.qcg.broker.schemas.resreqs.TimePeriodChoice;
13import org.qcg.broker.schemas.resreqs.Topology;
14import org.qcg.broker.schemas.resreqs.types.ComputingResourceParameterTypeNameType;
15
16import java.io.StringReader;
17import java.io.StringWriter;
18import java.util.ArrayList;
19import java.util.List;
20
21import org.exolab.castor.xml.Marshaller;
22import org.exolab.castor.xml.ResolverException;
23import org.exolab.castor.xml.Unmarshaller;
24import org.exolab.castor.xml.XMLContext;
25import org.joda.time.DateTime;
26import org.joda.time.Duration;
27import org.joda.time.MutableDateTime;
28import org.joda.time.ReadableDuration;
29
30import schedframe.scheduling.WorkloadUnitHandler;
31import schedframe.scheduling.manager.tasks.JobRegistryImpl;
32import schedframe.scheduling.tasks.requirements.ResourceParameterName;
33
34/**
35 *
36 * @author Marcin Krystek
37 *
38 */
39public class Task /*extends AbstractTask*/ implements TaskInterface<org.qcg.broker.schemas.resreqs.Task> {
40       
41        protected static Unmarshaller unmarshaller;
42        protected static Marshaller marshaller;
43        protected boolean isRegistered;
44       
45        static {
46                XMLContext context = new XMLContext();
47                try {
48                        context.addClass(org.qcg.broker.schemas.resreqs.Task.class);
49                        unmarshaller = context.createUnmarshaller();
50                        marshaller = context.createMarshaller();
51                } catch (ResolverException e) {
52                        e.printStackTrace();
53                        unmarshaller = null;
54                        marshaller = null;
55                }
56        }
57       
58        protected org.qcg.broker.schemas.resreqs.Task task;
59        /*
60         * The values for following variables are obtained from native Task
61         * object. This should significantly speed up access to task details.
62         */
63        private DateTime startTime;
64        private DateTime endTime;
65        private DateTime brokerSubmitTime;
66        private ReadableDuration duration;
67        private List<AbstractProcessesGroup> groups;
68        private List<AbstractProcesses> processes;
69        private long length;
70        private int status;
71        private int senderId;
72        private long workloadLogWaitTime;
73        //String resPathHistory;
74       
75        public Task(org.qcg.broker.schemas.resreqs.Task task){
76                this.task = task;
77                this.startTime = null;
78                this.endTime = null;
79                this.brokerSubmitTime = null;
80                this.duration = null;
81        //      this.gridletID_ = (getJobId() + "_" + getId()).hashCode();
82                prepareTopology();
83        }
84       
85        public Task(String task) throws Exception{
86                StringReader reader = new StringReader(task);
87                this.task = (org.qcg.broker.schemas.resreqs.Task) unmarshaller.unmarshal(reader);
88                this.startTime = null;
89                this.endTime = null;
90                this.brokerSubmitTime = null;
91                this.duration = null;
92        //      this.gridletID_ = (getJobId() + getId()).hashCode();
93                prepareTopology();
94        }
95
96        public DateTime getExecutionStartTime() throws NoSuchFieldException {
97                if(this.startTime != null)
98                        return this.startTime;
99               
100                ExecutionTimeType execTime = this.task.getExecutionTime();
101                if(execTime == null)
102                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
103                                                                                + " task "+ getId() + " is not defined.");
104               
105                TimePeriod timePeriod = execTime.getTimePeriod();
106                if(timePeriod == null)
107                        throw new NoSuchFieldException("Time Period for job " + getJobId()
108                                        + " task "+ getId() + " is not defined.");
109               
110                this.startTime = new DateTime(timePeriod.getPeriodStart());
111                return this.startTime;
112        }
113
114        public DateTime getExecutionEndTime() throws NoSuchFieldException {
115                if(this.endTime != null)
116                        return this.endTime;
117               
118                ExecutionTimeType execTime = this.task.getExecutionTime();
119                if(execTime == null)
120                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
121                                                                                + " task "+ getId() + " is not defined.");
122               
123                TimePeriod timePeriod = execTime.getTimePeriod();
124                if(timePeriod == null)
125                        throw new NoSuchFieldException("Time Period for job " + getJobId()
126                                        + " task "+ getId() + " is not defined.");
127               
128                TimePeriodChoice periodChoice = timePeriod.getTimePeriodChoice();
129                if(periodChoice == null)
130                        throw new NoSuchFieldException("Period End and Period Duration for job " + getJobId()
131                                        + " task "+ getId() + " are not defined.");
132               
133                java.util.Date periodEnd = periodChoice.getPeriodEnd();
134                if(periodEnd != null) {
135                        this.endTime = new DateTime(periodEnd);
136                        return this.endTime;
137                }
138               
139                org.exolab.castor.types.Duration duration = periodChoice.getPeriodDuration();
140                if(duration == null)
141                        throw new NoSuchFieldException("Period Duration for job " + getJobId()
142                                        + " task "+ getId() + " is not defined.");
143               
144                DateTime periodStart = getExecutionStartTime();
145                MutableDateTime m_periodEnd = periodStart.toMutableDateTime();
146                m_periodEnd.add(duration.toLong());
147
148                this.endTime = m_periodEnd.toDateTime();
149                periodChoice.setPeriodDuration(null);
150                periodChoice.setPeriodEnd(this.endTime.toDate());
151               
152                return this.endTime;
153        }
154
155        public ReadableDuration getExpectedDuration() throws NoSuchFieldException {
156                if(this.duration != null)
157                        return this.duration;
158               
159                ExecutionTimeType execTime = this.task.getExecutionTime();
160                if(execTime == null)
161                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
162                                                                                + " task "+ getId() + " is not defined.");
163               
164                org.exolab.castor.types.Duration d = execTime.getExecutionDuration();
165                if(d == null)
166                        throw new NoSuchFieldException("Execution Duration for job " + getJobId()
167                                        + " task "+ getId() + " is not defined.");
168
169                this.duration = new Duration(d.toLong());
170                return this.duration;
171        }
172
173        public String getJobId() {
174                return this.task.getJobId();
175        }
176
177        public double getParameterDoubleValue(ResourceParameterName parameterName)
178                        throws NoSuchFieldException, IllegalArgumentException {
179               
180                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
181               
182                switch (name) {
183                        case APPLICATION:
184                        case CPUARCH:
185                        case HOSTNAME:
186                        case LOCALRESOURCEMANAGER:
187                        case OSNAME:
188                        case OSRELEASE:
189                        case OSTYPE:
190                        case OSVERSION:
191                        case REMOTESUBMISSIONINTERFACE:
192                                throw new IllegalArgumentException("For " + parameterName + " use getParameterStringValue() method.");
193                }
194
195                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
196               
197                double returnValue = 0;
198                boolean notFound = true;
199               
200                for(int i = 0; i < item.length && notFound; i++){
201                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
202                        if(hostParameter == null)
203                                continue;
204                       
205                        if(name == hostParameter.getName()) {
206                                returnValue = hostParameter.getParameterTypeChoice().getParameterTypeChoiceItem(0).getParameterValue().getContent();
207                                notFound = false;
208                        }
209                }
210               
211                if(notFound)
212                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
213                                        + " task "+ getId() + " is not defined.");
214               
215                return returnValue;
216        }
217
218        public String getParameterStringValue(ResourceParameterName parameterName)
219                        throws NoSuchFieldException, IllegalArgumentException {
220                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
221               
222                switch (name) {
223                        case CPUCOUNT:
224                        case GPUCOUNT:
225                        case CPUSPEED:
226                        case DISKSPACE:
227                        case FREECPUS:
228                        case FREEDISKSPACE:
229                        case FREEMEMORY:
230                        case MEMORY:
231                                throw new IllegalArgumentException("For " + parameterName + " use getParameterDoubleValue() method.");
232                }
233               
234                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
235               
236                String returnValue = null;
237                boolean notFound = true;
238               
239                for(int i = 0; i < item.length && notFound; i++){
240                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
241                        if(hostParameter == null)
242                                continue;
243                       
244                        if(name == hostParameter.getName()) {
245                                returnValue = hostParameter.getStringValue(0).getValue();
246                                notFound = false;
247                        }
248                }
249               
250                if(notFound)
251                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
252                                        + " task "+ getId() + " is not defined.");
253               
254                return returnValue;
255        }
256
257        public DateTime getSubmissionTimeToBroker() {
258                if(this.brokerSubmitTime != null)
259                        return this.brokerSubmitTime;
260               
261                this.brokerSubmitTime = new DateTime(this.task.getSubmissionTime());
262               
263                return this.brokerSubmitTime;
264        }
265
266        public String getId() {
267                return this.task.getTaskId();
268        }
269
270        public String getUserDn() {
271                return this.task.getUserDN();
272        }
273
274        public org.qcg.broker.schemas.resreqs.Task getDescription() {
275                return this.task;
276        }
277
278        public String getDocument() throws Exception {
279                StringWriter writer = new StringWriter();
280               
281                marshaller.marshal(this.task, writer);
282               
283                return writer.toString();
284        }
285       
286        protected ComputingResourceBaseTypeItem[] getComputingResourceRequirements() throws NoSuchFieldException{
287               
288                Requirements req = this.task.getRequirements();
289                if(req == null)
290                        throw new NoSuchFieldException("Requierements section for job " + getJobId()
291                                                                                + " task "+ getId() + " is not defined.");
292               
293                TaskResourceRequirements taskReq = req.getTaskResourceRequirements();
294                if(taskReq == null)
295                        throw new NoSuchFieldException("Task resource requirements section for job " + getJobId()
296                                        + " task "+ getId() + " is not defined.");
297               
298                ComputingResource computingResource = taskReq.getComputingResource(0);
299                if(computingResource == null)
300                        throw new NoSuchFieldException("Computing resource requirement for job " + getJobId()
301                                        + " task "+ getId() + " is not defined.");
302               
303                ComputingResourceBaseTypeItem item[] = computingResource.getComputingResourceBaseTypeItem();
304                if(item == null || item.length == 0)
305                        throw new NoSuchFieldException("Computing resource requirement is empty for job " + getJobId()
306                                        + " task "+ getId());
307       
308                return item;
309        }
310       
311        public void setValue(ResourceParameterName parameterName, Object newValue) throws NoSuchFieldException{
312                boolean notFound = true;
313               
314                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
315               
316                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
317               
318                for(int i = 0; i < item.length && notFound; i++){
319                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
320                        if(hostParameter == null)
321                                continue;
322                       
323                        if(name == hostParameter.getName()) {
324                                hostParameter.
325                                                getParameterTypeChoice().
326                                                getParameterTypeChoiceItem(0).
327                                                getParameterValue().
328                                                setContent(((Integer)newValue).doubleValue());
329                                notFound = false;
330                        }
331                }
332               
333                if(notFound)
334                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
335                                        + " task "+ getId() + " is not defined.");
336        }
337
338        public List<AbstractProcessesGroup> getProcessesGroups() {
339                return this.groups;
340        }
341       
342        public List<AbstractProcesses> getProcesses(){
343                return this.processes;
344        }
345       
346        public List<AbstractProcesses> getProcesses(AbstractProcessesGroup processGroup){
347                if(this.processes == null)
348                        return null;
349               
350                List<AbstractProcesses> ret = new ArrayList<AbstractProcesses>();
351               
352                for(int i = 0; i < processes.size(); i++){
353                        AbstractProcesses p = processes.get(i);
354                        if(p.belongsTo(processGroup))
355                                ret.add(p);
356                }
357               
358                return ret;
359        }
360       
361        protected void prepareTopology(){
362                if(this.task.getRequirements() == null)
363                        return;
364               
365                if(this.task.getRequirements().getTopologyCount() < 1)
366                        return;
367               
368                Topology topology = this.task.getRequirements().getTopology(0);
369               
370                if(topology.getGroupCount() > 0){
371                        this.groups = new ArrayList<AbstractProcessesGroup>(topology.getGroupCount());
372                }
373               
374                for(int i = 0; i < topology.getGroupCount(); i++){
375                        this.groups.add(new ProcessesGroup(topology.getGroup(i)));
376                }
377
378                if(topology.getProcessesCount() > 0){
379                        this.processes = new ArrayList<AbstractProcesses>(topology.getProcessesCount());
380                }
381
382                for(int i = 0; i < topology.getProcessesCount(); i++){
383                        org.qcg.broker.schemas.resreqs.Processes p = topology.getProcesses(i);
384                        if(p.getProcessesResourceRequirements() == null){
385                                TaskResourceRequirements trr = this.task.getRequirements().getTaskResourceRequirements();
386                                if(trr != null) {
387                                        ProcessesResourceRequirements prr = new ProcessesResourceRequirements();
388                                       
389                                        for(int cridx = 0; cridx < trr.getComputingResourceCount(); cridx++){
390                                                ComputingResourceExtType cre = new ComputingResourceExtType();
391                                                ComputingResource cr = trr.getComputingResource(cridx);
392                                               
393                                                for(int j = 0; j < cr.getComputingResourceBaseTypeItemCount(); j++){
394                                                        cre.addComputingResourceBaseTypeItem(cr.getComputingResourceBaseTypeItem(j));
395                                                }
396                                               
397                                                prr.addComputingResource(cre);
398                                        }
399                                       
400                                        p.setProcessesResourceRequirements(prr);
401                                }
402                        }
403                       
404                        this.processes.add(new Processes(p));
405                }
406        }
407
408        public double getCpuCntRequest() throws NoSuchFieldException {
409                return getParameterDoubleValue(ResourceParameterName.CPUCOUNT);
410        }
411
412        public double getMemoryRequest() throws NoSuchFieldException {
413                return getParameterDoubleValue(ResourceParameterName.MEMORY);
414        }
415       
416        public long getLength() {
417                return this.length;
418        }
419
420        public int getStatus() {
421                return this.status;
422        }
423
424        public void setLength(long length) {
425                this.length = length;
426        }
427
428        public void setStatus(int status){
429                this.status = status;
430        }
431       
432        public void setSenderId(int id){
433                this.senderId = id;
434        }
435       
436        public int getSenderId(){
437                return this.senderId;
438        }
439       
440        public boolean isFinished(){
441                if(processes == null)
442                        return (status > 3 && status <= 6);
443               
444                for(int i = 0; i < processes.size(); i++){
445                        if(!processes.get(i).isFinished())
446                                return false;
447                }
448               
449                return true;
450        }
451       
452        public long getWorkloadLogWaitTime() {
453                return workloadLogWaitTime;
454        }
455
456        public void setWorkloadLogWaitTime(long waitTime) {
457                this.workloadLogWaitTime = waitTime;
458        }
459       
460        /*public void addToResPath(String resName){
461                if(resPathHistory == null)
462                        resPathHistory = new String();
463                resPathHistory = new StringBuffer(resPathHistory).append(resName).append("_").toString();
464
465        }
466       
467        public String getResPath(){
468                return this.resPathHistory;
469
470        }*/
471
472
473        @Override
474        public int getUserID() {
475                // TODO Auto-generated method stub
476                return 0;
477        }
478
479        public boolean isRegistered() {
480                return isRegistered;
481        }
482
483        public void register(JobRegistryImpl jobRegistry) {
484                isRegistered = true;
485        }
486
487        public void accept(WorkloadUnitHandler wuh) {
488                wuh.handleTask(this);
489        }
490}
Note: See TracBrowser for help on using the repository browser.