source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/tasks/Task.java @ 1194

Revision 1194, 15.7 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.tasks;
2
3import java.io.StringReader;
4import java.io.StringWriter;
5import java.util.ArrayList;
6import java.util.LinkedList;
7import java.util.List;
8
9import org.exolab.castor.xml.Marshaller;
10import org.exolab.castor.xml.ResolverException;
11import org.exolab.castor.xml.Unmarshaller;
12import org.exolab.castor.xml.XMLContext;
13import org.joda.time.DateTime;
14import org.joda.time.Duration;
15import org.joda.time.MutableDateTime;
16import org.joda.time.ReadableDuration;
17import org.qcg.broker.schemas.resreqs.ComputingResource;
18import org.qcg.broker.schemas.resreqs.ComputingResourceBaseTypeItem;
19import org.qcg.broker.schemas.resreqs.ComputingResourceExtType;
20import org.qcg.broker.schemas.resreqs.ComputingResourceParameterType;
21import org.qcg.broker.schemas.resreqs.ExecutionTimeType;
22import org.qcg.broker.schemas.resreqs.ProcessesResourceRequirements;
23import org.qcg.broker.schemas.resreqs.Requirements;
24import org.qcg.broker.schemas.resreqs.ResourceConsumptionType;
25import org.qcg.broker.schemas.resreqs.TaskResourceRequirements;
26import org.qcg.broker.schemas.resreqs.TimePeriod;
27import org.qcg.broker.schemas.resreqs.TimePeriodChoice;
28import org.qcg.broker.schemas.resreqs.Topology;
29import org.qcg.broker.schemas.resreqs.types.ComputingResourceParameterTypeNameType;
30
31import schedframe.scheduling.WorkloadUnitHandler;
32import schedframe.scheduling.manager.tasks.JobRegistryImpl;
33import schedframe.scheduling.tasks.phases.ResourceConsumption;
34import schedframe.scheduling.tasks.phases.ResourceConsumptionProfile;
35import schedframe.scheduling.tasks.requirements.ResourceParameterName;
36
37/**
38 *
39 * @author Marcin Krystek
40 *
41 */
42public class Task implements TaskInterface<org.qcg.broker.schemas.resreqs.Task> {
43       
44        protected static Unmarshaller unmarshaller;
45        protected static Marshaller marshaller;
46        protected boolean isRegistered;
47       
48        static {
49                XMLContext context = new XMLContext();
50                try {
51                        context.addClass(org.qcg.broker.schemas.resreqs.Task.class);
52                        unmarshaller = context.createUnmarshaller();
53                        marshaller = context.createMarshaller();
54                } catch (ResolverException e) {
55                        e.printStackTrace();
56                        unmarshaller = null;
57                        marshaller = null;
58                }
59        }
60       
61        protected org.qcg.broker.schemas.resreqs.Task task;
62        /*
63         * The values for following variables are obtained from native Task
64         * object. This should significantly speed up access to task details.
65         */
66        private DateTime startTime;
67        private DateTime endTime;
68        private DateTime brokerSubmitTime;
69        private ReadableDuration duration;
70        private List<AbstractProcessesGroup> groups;
71        private List<AbstractProcesses> processes;
72        private long length;
73        private int status;
74        private int senderId;
75        private long workloadLogWaitTime;
76       
77        private ResourceConsumptionProfile resourceConsumptionProfile;
78
79        public Task(org.qcg.broker.schemas.resreqs.Task task){
80                this.task = task;
81                this.startTime = null;
82                this.endTime = null;
83                this.brokerSubmitTime = null;
84                this.duration = null;
85                prepareTopology();
86        }
87       
88        public Task(String task) throws Exception{
89                StringReader reader = new StringReader(task);
90                this.task = (org.qcg.broker.schemas.resreqs.Task) unmarshaller.unmarshal(reader);
91                this.startTime = null;
92                this.endTime = null;
93                this.brokerSubmitTime = null;
94                this.duration = null;
95                prepareTopology();
96        }
97
98        private void preparePhases() {
99                LinkedList<ResourceConsumption> resourceConsumptionList = new LinkedList<ResourceConsumption>();
100               
101                if(task.getExecution() == null || task.getExecution().getResourceConsumptionProfile() == null){
102                        ResourceConsumption resConsumption = null;
103                        try {
104                                resConsumption = new ResourceConsumption(this.length, getComputingResourceRequirements());
105                        } catch (NoSuchFieldException e) {
106                                // TODO Auto-generated catch block
107                                e.printStackTrace();
108                        }
109                        resourceConsumptionList.add(resConsumption);
110                }
111                else{
112                        for(ResourceConsumptionType resConsumption: task.getExecution().getResourceConsumptionProfile().getResourceConsumption()){
113                                ResourceConsumption resourceConsumption = new ResourceConsumption(resConsumption);
114                                resourceConsumptionList.add(resourceConsumption);
115                        }
116                }
117                this.resourceConsumptionProfile = new ResourceConsumptionProfile(resourceConsumptionList);
118               
119        //      System.out.println("======"+task.getExecution().getExecutable().getApplication().getName());
120        }
121       
122        public DateTime getExecutionStartTime() throws NoSuchFieldException {
123                if(this.startTime != null)
124                        return this.startTime;
125               
126                ExecutionTimeType execTime = this.task.getExecutionTime();
127                if(execTime == null)
128                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
129                                                                                + " task "+ getId() + " is not defined.");
130               
131                TimePeriod timePeriod = execTime.getTimePeriod();
132                if(timePeriod == null)
133                        throw new NoSuchFieldException("Time Period for job " + getJobId()
134                                        + " task "+ getId() + " is not defined.");
135               
136                this.startTime = new DateTime(timePeriod.getPeriodStart());
137                return this.startTime;
138        }
139
140        public DateTime getExecutionEndTime() throws NoSuchFieldException {
141                if(this.endTime != null)
142                        return this.endTime;
143               
144                ExecutionTimeType execTime = this.task.getExecutionTime();
145                if(execTime == null)
146                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
147                                                                                + " task "+ getId() + " is not defined.");
148               
149                TimePeriod timePeriod = execTime.getTimePeriod();
150                if(timePeriod == null)
151                        throw new NoSuchFieldException("Time Period for job " + getJobId()
152                                        + " task "+ getId() + " is not defined.");
153               
154                TimePeriodChoice periodChoice = timePeriod.getTimePeriodChoice();
155                if(periodChoice == null)
156                        throw new NoSuchFieldException("Period End and Period Duration for job " + getJobId()
157                                        + " task "+ getId() + " are not defined.");
158               
159                java.util.Date periodEnd = periodChoice.getPeriodEnd();
160                if(periodEnd != null) {
161                        this.endTime = new DateTime(periodEnd);
162                        return this.endTime;
163                }
164               
165                org.exolab.castor.types.Duration duration = periodChoice.getPeriodDuration();
166                if(duration == null)
167                        throw new NoSuchFieldException("Period Duration for job " + getJobId()
168                                        + " task "+ getId() + " is not defined.");
169               
170                DateTime periodStart = getExecutionStartTime();
171                MutableDateTime m_periodEnd = periodStart.toMutableDateTime();
172                m_periodEnd.add(duration.toLong());
173
174                this.endTime = m_periodEnd.toDateTime();
175                periodChoice.setPeriodDuration(null);
176                periodChoice.setPeriodEnd(this.endTime.toDate());
177               
178                return this.endTime;
179        }
180
181        public ReadableDuration getExpectedDuration() throws NoSuchFieldException {
182                if(this.duration != null)
183                        return this.duration;
184               
185                ExecutionTimeType execTime = this.task.getExecutionTime();
186                if(execTime == null)
187                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
188                                                                                + " task "+ getId() + " is not defined.");
189               
190                org.exolab.castor.types.Duration d = execTime.getExecutionDuration();
191                if(d == null)
192                        throw new NoSuchFieldException("Execution Duration for job " + getJobId()
193                                        + " task "+ getId() + " is not defined.");
194
195                this.duration = new Duration(d.toLong());
196                return this.duration;
197        }
198
199        public String getJobId() {
200                return this.task.getJobId();
201        }
202
203        public double getParameterDoubleValue(ResourceParameterName parameterName)
204                        throws NoSuchFieldException, IllegalArgumentException {
205               
206                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
207               
208                switch (name) {
209                        case APPLICATION:
210                        case CPUARCH:
211                        case HOSTNAME:
212                        case LOCALRESOURCEMANAGER:
213                        case OSNAME:
214                        case OSRELEASE:
215                        case OSTYPE:
216                        case OSVERSION:
217                        case REMOTESUBMISSIONINTERFACE:
218                                throw new IllegalArgumentException("For " + parameterName + " use getParameterStringValue() method.");
219                }
220
221                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
222               
223                double returnValue = 0;
224                boolean notFound = true;
225               
226                for(int i = 0; i < item.length && notFound; i++){
227                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
228                        if(hostParameter == null)
229                                continue;
230                       
231                        if(name == hostParameter.getName()) {
232                                returnValue = hostParameter.getParameterTypeChoice().getParameterTypeChoiceItem(0).getParameterValue().getContent();
233                                notFound = false;
234                        }
235                }
236               
237                if(notFound)
238                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
239                                        + " task "+ getId() + " is not defined.");
240               
241                return returnValue;
242        }
243
244        public String getParameterStringValue(ResourceParameterName parameterName)
245                        throws NoSuchFieldException, IllegalArgumentException {
246                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
247               
248                switch (name) {
249                        case CPUCOUNT:
250                        case GPUCOUNT:
251                        case CPUSPEED:
252                        case DISKSPACE:
253                        case FREECPUS:
254                        case FREEDISKSPACE:
255                        case FREEMEMORY:
256                        case MEMORY:
257                                throw new IllegalArgumentException("For " + parameterName + " use getParameterDoubleValue() method.");
258                }
259               
260                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
261               
262                String returnValue = null;
263                boolean notFound = true;
264               
265                for(int i = 0; i < item.length && notFound; i++){
266                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
267                        if(hostParameter == null)
268                                continue;
269                       
270                        if(name == hostParameter.getName()) {
271                                returnValue = hostParameter.getStringValue(0).getValue();
272                                notFound = false;
273                        }
274                }
275               
276                if(notFound)
277                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
278                                        + " task "+ getId() + " is not defined.");
279               
280                return returnValue;
281        }
282
283        public DateTime getSubmissionTimeToBroker() {
284                if(this.brokerSubmitTime != null)
285                        return this.brokerSubmitTime;
286               
287                this.brokerSubmitTime = new DateTime(this.task.getSubmissionTime());
288               
289                return this.brokerSubmitTime;
290        }
291
292        public String getId() {
293                return this.task.getId();
294        }
295
296        public String getUserDN() {
297                return this.task.getUserDN();
298        }
299
300        public org.qcg.broker.schemas.resreqs.Task getDescription() {
301                return this.task;
302        }
303
304        public String getDocument() throws Exception {
305                StringWriter writer = new StringWriter();
306               
307                marshaller.marshal(this.task, writer);
308               
309                return writer.toString();
310        }
311       
312        protected ComputingResourceBaseTypeItem[] getComputingResourceRequirements() throws NoSuchFieldException{
313               
314                Requirements req = this.task.getRequirements();
315                if(req == null)
316                        throw new NoSuchFieldException("Requierements section for job " + getJobId()
317                                                                                + " task "+ getId() + " is not defined.");
318               
319                TaskResourceRequirements taskReq = req.getTaskResourceRequirements();
320                if(taskReq == null)
321                        throw new NoSuchFieldException("Task resource requirements section for job " + getJobId()
322                                        + " task "+ getId() + " is not defined.");
323               
324                ComputingResource computingResource = taskReq.getComputingResource(0);
325                if(computingResource == null)
326                        throw new NoSuchFieldException("Computing resource requirement for job " + getJobId()
327                                        + " task "+ getId() + " is not defined.");
328               
329                ComputingResourceBaseTypeItem item[] = computingResource.getComputingResourceBaseTypeItem();
330                if(item == null || item.length == 0)
331                        throw new NoSuchFieldException("Computing resource requirement is empty for job " + getJobId()
332                                        + " task "+ getId());
333       
334                return item;
335        }
336       
337        public void setValue(ResourceParameterName parameterName, Object newValue) throws NoSuchFieldException{
338                boolean notFound = true;
339               
340                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
341               
342                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
343               
344                for(int i = 0; i < item.length && notFound; i++){
345                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
346                        if(hostParameter == null)
347                                continue;
348                       
349                        if(name == hostParameter.getName()) {
350                                hostParameter.
351                                                getParameterTypeChoice().
352                                                getParameterTypeChoiceItem(0).
353                                                getParameterValue().
354                                                setContent(((Integer)newValue).doubleValue());
355                                notFound = false;
356                        }
357                }
358               
359                if(notFound)
360                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
361                                        + " task "+ getId() + " is not defined.");
362        }
363
364        public List<AbstractProcessesGroup> getProcessesGroups() {
365                return this.groups;
366        }
367       
368        public List<AbstractProcesses> getProcesses(){
369                return this.processes;
370        }
371       
372        public List<AbstractProcesses> getProcesses(AbstractProcessesGroup processGroup){
373                if(this.processes == null)
374                        return null;
375               
376                List<AbstractProcesses> ret = new ArrayList<AbstractProcesses>();
377               
378                for(int i = 0; i < processes.size(); i++){
379                        AbstractProcesses p = processes.get(i);
380                        if(p.belongsTo(processGroup))
381                                ret.add(p);
382                }
383               
384                return ret;
385        }
386       
387        protected void prepareTopology(){
388                if(this.task.getRequirements() == null)
389                        return;
390               
391                if(this.task.getRequirements().getTopologyCount() < 1)
392                        return;
393               
394                Topology topology = this.task.getRequirements().getTopology(0);
395               
396                if(topology.getGroupCount() > 0){
397                        this.groups = new ArrayList<AbstractProcessesGroup>(topology.getGroupCount());
398                }
399               
400                for(int i = 0; i < topology.getGroupCount(); i++){
401                        this.groups.add(new ProcessesGroup(topology.getGroup(i)));
402                }
403
404                if(topology.getProcessesCount() > 0){
405                        this.processes = new ArrayList<AbstractProcesses>(topology.getProcessesCount());
406                }
407
408                for(int i = 0; i < topology.getProcessesCount(); i++){
409                        org.qcg.broker.schemas.resreqs.Processes p = topology.getProcesses(i);
410                        if(p.getProcessesResourceRequirements() == null){
411                                TaskResourceRequirements trr = this.task.getRequirements().getTaskResourceRequirements();
412                                if(trr != null) {
413                                        ProcessesResourceRequirements prr = new ProcessesResourceRequirements();
414                                       
415                                        for(int cridx = 0; cridx < trr.getComputingResourceCount(); cridx++){
416                                                ComputingResourceExtType cre = new ComputingResourceExtType();
417                                                ComputingResource cr = trr.getComputingResource(cridx);
418                                               
419                                                for(int j = 0; j < cr.getComputingResourceBaseTypeItemCount(); j++){
420                                                        cre.addComputingResourceBaseTypeItem(cr.getComputingResourceBaseTypeItem(j));
421                                                }
422                                               
423                                                prr.addComputingResource(cre);
424                                        }
425                                       
426                                        p.setProcessesResourceRequirements(prr);
427                                }
428                        }
429                       
430                        this.processes.add(new Processes(p));
431                }
432        }
433
434        public double getCpuCntRequest() throws NoSuchFieldException {
435                return getParameterDoubleValue(ResourceParameterName.CPUCOUNT);
436        }
437
438        public double getMemoryRequest() throws NoSuchFieldException {
439                return getParameterDoubleValue(ResourceParameterName.MEMORY);
440        }
441       
442        public long getLength() {
443                return this.length;
444        }
445
446        public int getStatus() {
447                return this.status;
448        }
449
450        public void setLength(long length) {
451                this.length = length;
452                preparePhases();
453        }
454
455        public void setStatus(int status){
456                this.status = status;
457        }
458       
459        public void setSenderId(int id){
460                this.senderId = id;
461        }
462       
463        public int getSenderId(){
464                return this.senderId;
465        }
466       
467        public boolean isFinished(){
468                if(processes == null)
469                        return (status > 3 && status <= 6);
470               
471                for(int i = 0; i < processes.size(); i++){
472                        if(!processes.get(i).isFinished())
473                                return false;
474                }
475               
476                return true;
477        }
478       
479        public long getWorkloadLogWaitTime() {
480                return workloadLogWaitTime;
481        }
482
483        public void setWorkloadLogWaitTime(long waitTime) {
484                this.workloadLogWaitTime = waitTime;
485        }
486       
487        /*public void addToResPath(String resName){
488                if(resPathHistory == null)
489                        resPathHistory = new String();
490                resPathHistory = new StringBuffer(resPathHistory).append(resName).append("_").toString();
491
492        }
493       
494        public String getResPath(){
495                return this.resPathHistory;
496
497        }*/
498
499
500        @Override
501        public int getUserId() {
502                // TODO Auto-generated method stub
503                return 0;
504        }
505
506        public boolean isRegistered() {
507                return isRegistered;
508        }
509
510        public void register(JobRegistryImpl jobRegistry) {
511                isRegistered = true;
512        }
513
514        public void accept(WorkloadUnitHandler wuh) {
515                wuh.handleTask(this);
516        }
517       
518        public ResourceConsumptionProfile getResourceConsumptionProfile(){
519                return resourceConsumptionProfile;
520        }
521       
522        public String getApplicationName(){
523                try{
524                        return task.getExecution().getExecutable().getApplication().getName();
525                }catch (Exception e){
526                        return null;
527                }
528        }
529}
Note: See TracBrowser for help on using the repository browser.