source: DCWoRMS/trunk/src/schedframe/scheduling/tasks/Task.java @ 490

Revision 490, 14.2 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.tasks;
2
3import org.qcg.broker.schemas.resreqs.ComputingResource;
4import org.qcg.broker.schemas.resreqs.ComputingResourceBaseTypeItem;
5import org.qcg.broker.schemas.resreqs.ComputingResourceExtType;
6import org.qcg.broker.schemas.resreqs.ComputingResourceParameterType;
7import org.qcg.broker.schemas.resreqs.ExecutionTimeType;
8import org.qcg.broker.schemas.resreqs.ProcessesResourceRequirements;
9import org.qcg.broker.schemas.resreqs.Requirements;
10import org.qcg.broker.schemas.resreqs.TaskResourceRequirements;
11import org.qcg.broker.schemas.resreqs.TimePeriod;
12import org.qcg.broker.schemas.resreqs.TimePeriodChoice;
13import org.qcg.broker.schemas.resreqs.Topology;
14import org.qcg.broker.schemas.resreqs.types.ComputingResourceParameterTypeNameType;
15
16import java.io.StringReader;
17import java.io.StringWriter;
18import java.util.ArrayList;
19import java.util.List;
20
21import org.exolab.castor.xml.Marshaller;
22import org.exolab.castor.xml.ResolverException;
23import org.exolab.castor.xml.Unmarshaller;
24import org.exolab.castor.xml.XMLContext;
25import org.joda.time.DateTime;
26import org.joda.time.Duration;
27import org.joda.time.MutableDateTime;
28import org.joda.time.ReadableDuration;
29
30import schedframe.scheduling.WorkloadUnitHandler;
31import schedframe.scheduling.manager.tasks.JobRegistryImpl;
32import schedframe.scheduling.tasks.requirements.ResourceParameterName;
33
34/**
35 *
36 * @author Marcin Krystek
37 *
38 */
39public class Task implements TaskInterface<org.qcg.broker.schemas.resreqs.Task> {
40       
41        protected static Unmarshaller unmarshaller;
42        protected static Marshaller marshaller;
43        protected boolean isRegistered;
44       
45        static {
46                XMLContext context = new XMLContext();
47                try {
48                        context.addClass(org.qcg.broker.schemas.resreqs.Task.class);
49                        unmarshaller = context.createUnmarshaller();
50                        marshaller = context.createMarshaller();
51                } catch (ResolverException e) {
52                        e.printStackTrace();
53                        unmarshaller = null;
54                        marshaller = null;
55                }
56        }
57       
58        protected org.qcg.broker.schemas.resreqs.Task task;
59        /*
60         * The values for following variables are obtained from native Task
61         * object. This should significantly speed up access to task details.
62         */
63        private DateTime startTime;
64        private DateTime endTime;
65        private DateTime brokerSubmitTime;
66        private ReadableDuration duration;
67        private List<AbstractProcessesGroup> groups;
68        private List<AbstractProcesses> processes;
69        private long length;
70        private int status;
71        private int senderId;
72        private long workloadLogWaitTime;
73
74        public Task(org.qcg.broker.schemas.resreqs.Task task){
75                this.task = task;
76                this.startTime = null;
77                this.endTime = null;
78                this.brokerSubmitTime = null;
79                this.duration = null;
80                prepareTopology();
81        }
82       
83        public Task(String task) throws Exception{
84                StringReader reader = new StringReader(task);
85                this.task = (org.qcg.broker.schemas.resreqs.Task) unmarshaller.unmarshal(reader);
86                this.startTime = null;
87                this.endTime = null;
88                this.brokerSubmitTime = null;
89                this.duration = null;
90                prepareTopology();
91        }
92
93        public DateTime getExecutionStartTime() throws NoSuchFieldException {
94                if(this.startTime != null)
95                        return this.startTime;
96               
97                ExecutionTimeType execTime = this.task.getExecutionTime();
98                if(execTime == null)
99                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
100                                                                                + " task "+ getId() + " is not defined.");
101               
102                TimePeriod timePeriod = execTime.getTimePeriod();
103                if(timePeriod == null)
104                        throw new NoSuchFieldException("Time Period for job " + getJobId()
105                                        + " task "+ getId() + " is not defined.");
106               
107                this.startTime = new DateTime(timePeriod.getPeriodStart());
108                return this.startTime;
109        }
110
111        public DateTime getExecutionEndTime() throws NoSuchFieldException {
112                if(this.endTime != null)
113                        return this.endTime;
114               
115                ExecutionTimeType execTime = this.task.getExecutionTime();
116                if(execTime == null)
117                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
118                                                                                + " task "+ getId() + " is not defined.");
119               
120                TimePeriod timePeriod = execTime.getTimePeriod();
121                if(timePeriod == null)
122                        throw new NoSuchFieldException("Time Period for job " + getJobId()
123                                        + " task "+ getId() + " is not defined.");
124               
125                TimePeriodChoice periodChoice = timePeriod.getTimePeriodChoice();
126                if(periodChoice == null)
127                        throw new NoSuchFieldException("Period End and Period Duration for job " + getJobId()
128                                        + " task "+ getId() + " are not defined.");
129               
130                java.util.Date periodEnd = periodChoice.getPeriodEnd();
131                if(periodEnd != null) {
132                        this.endTime = new DateTime(periodEnd);
133                        return this.endTime;
134                }
135               
136                org.exolab.castor.types.Duration duration = periodChoice.getPeriodDuration();
137                if(duration == null)
138                        throw new NoSuchFieldException("Period Duration for job " + getJobId()
139                                        + " task "+ getId() + " is not defined.");
140               
141                DateTime periodStart = getExecutionStartTime();
142                MutableDateTime m_periodEnd = periodStart.toMutableDateTime();
143                m_periodEnd.add(duration.toLong());
144
145                this.endTime = m_periodEnd.toDateTime();
146                periodChoice.setPeriodDuration(null);
147                periodChoice.setPeriodEnd(this.endTime.toDate());
148               
149                return this.endTime;
150        }
151
152        public ReadableDuration getExpectedDuration() throws NoSuchFieldException {
153                if(this.duration != null)
154                        return this.duration;
155               
156                ExecutionTimeType execTime = this.task.getExecutionTime();
157                if(execTime == null)
158                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
159                                                                                + " task "+ getId() + " is not defined.");
160               
161                org.exolab.castor.types.Duration d = execTime.getExecutionDuration();
162                if(d == null)
163                        throw new NoSuchFieldException("Execution Duration for job " + getJobId()
164                                        + " task "+ getId() + " is not defined.");
165
166                this.duration = new Duration(d.toLong());
167                return this.duration;
168        }
169
170        public String getJobId() {
171                return this.task.getJobId();
172        }
173
174        public double getParameterDoubleValue(ResourceParameterName parameterName)
175                        throws NoSuchFieldException, IllegalArgumentException {
176               
177                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
178               
179                switch (name) {
180                        case APPLICATION:
181                        case CPUARCH:
182                        case HOSTNAME:
183                        case LOCALRESOURCEMANAGER:
184                        case OSNAME:
185                        case OSRELEASE:
186                        case OSTYPE:
187                        case OSVERSION:
188                        case REMOTESUBMISSIONINTERFACE:
189                                throw new IllegalArgumentException("For " + parameterName + " use getParameterStringValue() method.");
190                }
191
192                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
193               
194                double returnValue = 0;
195                boolean notFound = true;
196               
197                for(int i = 0; i < item.length && notFound; i++){
198                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
199                        if(hostParameter == null)
200                                continue;
201                       
202                        if(name == hostParameter.getName()) {
203                                returnValue = hostParameter.getParameterTypeChoice().getParameterTypeChoiceItem(0).getParameterValue().getContent();
204                                notFound = false;
205                        }
206                }
207               
208                if(notFound)
209                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
210                                        + " task "+ getId() + " is not defined.");
211               
212                return returnValue;
213        }
214
215        public String getParameterStringValue(ResourceParameterName parameterName)
216                        throws NoSuchFieldException, IllegalArgumentException {
217                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
218               
219                switch (name) {
220                        case CPUCOUNT:
221                        case GPUCOUNT:
222                        case CPUSPEED:
223                        case DISKSPACE:
224                        case FREECPUS:
225                        case FREEDISKSPACE:
226                        case FREEMEMORY:
227                        case MEMORY:
228                                throw new IllegalArgumentException("For " + parameterName + " use getParameterDoubleValue() method.");
229                }
230               
231                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
232               
233                String returnValue = null;
234                boolean notFound = true;
235               
236                for(int i = 0; i < item.length && notFound; i++){
237                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
238                        if(hostParameter == null)
239                                continue;
240                       
241                        if(name == hostParameter.getName()) {
242                                returnValue = hostParameter.getStringValue(0).getValue();
243                                notFound = false;
244                        }
245                }
246               
247                if(notFound)
248                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
249                                        + " task "+ getId() + " is not defined.");
250               
251                return returnValue;
252        }
253
254        public DateTime getSubmissionTimeToBroker() {
255                if(this.brokerSubmitTime != null)
256                        return this.brokerSubmitTime;
257               
258                this.brokerSubmitTime = new DateTime(this.task.getSubmissionTime());
259               
260                return this.brokerSubmitTime;
261        }
262
263        public String getId() {
264                return this.task.getTaskId();
265        }
266
267        public String getUserDN() {
268                return this.task.getUserDN();
269        }
270
271        public org.qcg.broker.schemas.resreqs.Task getDescription() {
272                return this.task;
273        }
274
275        public String getDocument() throws Exception {
276                StringWriter writer = new StringWriter();
277               
278                marshaller.marshal(this.task, writer);
279               
280                return writer.toString();
281        }
282       
283        protected ComputingResourceBaseTypeItem[] getComputingResourceRequirements() throws NoSuchFieldException{
284               
285                Requirements req = this.task.getRequirements();
286                if(req == null)
287                        throw new NoSuchFieldException("Requierements section for job " + getJobId()
288                                                                                + " task "+ getId() + " is not defined.");
289               
290                TaskResourceRequirements taskReq = req.getTaskResourceRequirements();
291                if(taskReq == null)
292                        throw new NoSuchFieldException("Task resource requirements section for job " + getJobId()
293                                        + " task "+ getId() + " is not defined.");
294               
295                ComputingResource computingResource = taskReq.getComputingResource(0);
296                if(computingResource == null)
297                        throw new NoSuchFieldException("Computing resource requirement for job " + getJobId()
298                                        + " task "+ getId() + " is not defined.");
299               
300                ComputingResourceBaseTypeItem item[] = computingResource.getComputingResourceBaseTypeItem();
301                if(item == null || item.length == 0)
302                        throw new NoSuchFieldException("Computing resource requirement is empty for job " + getJobId()
303                                        + " task "+ getId());
304       
305                return item;
306        }
307       
308        public void setValue(ResourceParameterName parameterName, Object newValue) throws NoSuchFieldException{
309                boolean notFound = true;
310               
311                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
312               
313                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
314               
315                for(int i = 0; i < item.length && notFound; i++){
316                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
317                        if(hostParameter == null)
318                                continue;
319                       
320                        if(name == hostParameter.getName()) {
321                                hostParameter.
322                                                getParameterTypeChoice().
323                                                getParameterTypeChoiceItem(0).
324                                                getParameterValue().
325                                                setContent(((Integer)newValue).doubleValue());
326                                notFound = false;
327                        }
328                }
329               
330                if(notFound)
331                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
332                                        + " task "+ getId() + " is not defined.");
333        }
334
335        public List<AbstractProcessesGroup> getProcessesGroups() {
336                return this.groups;
337        }
338       
339        public List<AbstractProcesses> getProcesses(){
340                return this.processes;
341        }
342       
343        public List<AbstractProcesses> getProcesses(AbstractProcessesGroup processGroup){
344                if(this.processes == null)
345                        return null;
346               
347                List<AbstractProcesses> ret = new ArrayList<AbstractProcesses>();
348               
349                for(int i = 0; i < processes.size(); i++){
350                        AbstractProcesses p = processes.get(i);
351                        if(p.belongsTo(processGroup))
352                                ret.add(p);
353                }
354               
355                return ret;
356        }
357       
358        protected void prepareTopology(){
359                if(this.task.getRequirements() == null)
360                        return;
361               
362                if(this.task.getRequirements().getTopologyCount() < 1)
363                        return;
364               
365                Topology topology = this.task.getRequirements().getTopology(0);
366               
367                if(topology.getGroupCount() > 0){
368                        this.groups = new ArrayList<AbstractProcessesGroup>(topology.getGroupCount());
369                }
370               
371                for(int i = 0; i < topology.getGroupCount(); i++){
372                        this.groups.add(new ProcessesGroup(topology.getGroup(i)));
373                }
374
375                if(topology.getProcessesCount() > 0){
376                        this.processes = new ArrayList<AbstractProcesses>(topology.getProcessesCount());
377                }
378
379                for(int i = 0; i < topology.getProcessesCount(); i++){
380                        org.qcg.broker.schemas.resreqs.Processes p = topology.getProcesses(i);
381                        if(p.getProcessesResourceRequirements() == null){
382                                TaskResourceRequirements trr = this.task.getRequirements().getTaskResourceRequirements();
383                                if(trr != null) {
384                                        ProcessesResourceRequirements prr = new ProcessesResourceRequirements();
385                                       
386                                        for(int cridx = 0; cridx < trr.getComputingResourceCount(); cridx++){
387                                                ComputingResourceExtType cre = new ComputingResourceExtType();
388                                                ComputingResource cr = trr.getComputingResource(cridx);
389                                               
390                                                for(int j = 0; j < cr.getComputingResourceBaseTypeItemCount(); j++){
391                                                        cre.addComputingResourceBaseTypeItem(cr.getComputingResourceBaseTypeItem(j));
392                                                }
393                                               
394                                                prr.addComputingResource(cre);
395                                        }
396                                       
397                                        p.setProcessesResourceRequirements(prr);
398                                }
399                        }
400                       
401                        this.processes.add(new Processes(p));
402                }
403        }
404
405        public double getCpuCntRequest() throws NoSuchFieldException {
406                return getParameterDoubleValue(ResourceParameterName.CPUCOUNT);
407        }
408
409        public double getMemoryRequest() throws NoSuchFieldException {
410                return getParameterDoubleValue(ResourceParameterName.MEMORY);
411        }
412       
413        public long getLength() {
414                return this.length;
415        }
416
417        public int getStatus() {
418                return this.status;
419        }
420
421        public void setLength(long length) {
422                this.length = length;
423        }
424
425        public void setStatus(int status){
426                this.status = status;
427        }
428       
429        public void setSenderId(int id){
430                this.senderId = id;
431        }
432       
433        public int getSenderId(){
434                return this.senderId;
435        }
436       
437        public boolean isFinished(){
438                if(processes == null)
439                        return (status > 3 && status <= 6);
440               
441                for(int i = 0; i < processes.size(); i++){
442                        if(!processes.get(i).isFinished())
443                                return false;
444                }
445               
446                return true;
447        }
448       
449        public long getWorkloadLogWaitTime() {
450                return workloadLogWaitTime;
451        }
452
453        public void setWorkloadLogWaitTime(long waitTime) {
454                this.workloadLogWaitTime = waitTime;
455        }
456       
457        /*public void addToResPath(String resName){
458                if(resPathHistory == null)
459                        resPathHistory = new String();
460                resPathHistory = new StringBuffer(resPathHistory).append(resName).append("_").toString();
461
462        }
463       
464        public String getResPath(){
465                return this.resPathHistory;
466
467        }*/
468
469
470        @Override
471        public int getUserId() {
472                // TODO Auto-generated method stub
473                return 0;
474        }
475
476        public boolean isRegistered() {
477                return isRegistered;
478        }
479
480        public void register(JobRegistryImpl jobRegistry) {
481                isRegistered = true;
482        }
483
484        public void accept(WorkloadUnitHandler wuh) {
485                wuh.handleTask(this);
486        }
487}
Note: See TracBrowser for help on using the repository browser.