source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/tasks/Task.java @ 887

Revision 887, 15.7 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.tasks;
2
3import java.io.StringReader;
4import java.io.StringWriter;
5import java.util.ArrayList;
6import java.util.List;
7
8import org.exolab.castor.xml.Marshaller;
9import org.exolab.castor.xml.ResolverException;
10import org.exolab.castor.xml.Unmarshaller;
11import org.exolab.castor.xml.XMLContext;
12import org.joda.time.DateTime;
13import org.joda.time.Duration;
14import org.joda.time.MutableDateTime;
15import org.joda.time.ReadableDuration;
16import org.qcg.broker.schemas.resreqs.ComputingResource;
17import org.qcg.broker.schemas.resreqs.ComputingResourceBaseTypeItem;
18import org.qcg.broker.schemas.resreqs.ComputingResourceExtType;
19import org.qcg.broker.schemas.resreqs.ComputingResourceParameterType;
20import org.qcg.broker.schemas.resreqs.ExecutionTimeType;
21import org.qcg.broker.schemas.resreqs.ProcessesResourceRequirements;
22import org.qcg.broker.schemas.resreqs.Requirements;
23import org.qcg.broker.schemas.resreqs.ResourceConsumptionType;
24import org.qcg.broker.schemas.resreqs.TaskResourceRequirements;
25import org.qcg.broker.schemas.resreqs.TimePeriod;
26import org.qcg.broker.schemas.resreqs.TimePeriodChoice;
27import org.qcg.broker.schemas.resreqs.Topology;
28import org.qcg.broker.schemas.resreqs.types.ComputingResourceParameterTypeNameType;
29
30import schedframe.scheduling.WorkloadUnitHandler;
31import schedframe.scheduling.manager.tasks.JobRegistryImpl;
32import schedframe.scheduling.tasks.phases.ResourceConsumption;
33import schedframe.scheduling.tasks.phases.ResourceConsumptionProfile;
34import schedframe.scheduling.tasks.requirements.ResourceParameterName;
35
36/**
37 *
38 * @author Marcin Krystek
39 *
40 */
41public class Task implements TaskInterface<org.qcg.broker.schemas.resreqs.Task> {
42       
43        protected static Unmarshaller unmarshaller;
44        protected static Marshaller marshaller;
45        protected boolean isRegistered;
46       
47        static {
48                XMLContext context = new XMLContext();
49                try {
50                        context.addClass(org.qcg.broker.schemas.resreqs.Task.class);
51                        unmarshaller = context.createUnmarshaller();
52                        marshaller = context.createMarshaller();
53                } catch (ResolverException e) {
54                        e.printStackTrace();
55                        unmarshaller = null;
56                        marshaller = null;
57                }
58        }
59       
60        protected org.qcg.broker.schemas.resreqs.Task task;
61        /*
62         * The values for following variables are obtained from native Task
63         * object. This should significantly speed up access to task details.
64         */
65        private DateTime startTime;
66        private DateTime endTime;
67        private DateTime brokerSubmitTime;
68        private ReadableDuration duration;
69        private List<AbstractProcessesGroup> groups;
70        private List<AbstractProcesses> processes;
71        private long length;
72        private int status;
73        private int senderId;
74        private long workloadLogWaitTime;
75       
76        private ResourceConsumptionProfile resourceConsumptionProfile;
77
78        public Task(org.qcg.broker.schemas.resreqs.Task task){
79                this.task = task;
80                this.startTime = null;
81                this.endTime = null;
82                this.brokerSubmitTime = null;
83                this.duration = null;
84                prepareTopology();
85        }
86       
87        public Task(String task) throws Exception{
88                StringReader reader = new StringReader(task);
89                this.task = (org.qcg.broker.schemas.resreqs.Task) unmarshaller.unmarshal(reader);
90                this.startTime = null;
91                this.endTime = null;
92                this.brokerSubmitTime = null;
93                this.duration = null;
94                prepareTopology();
95        }
96
97        private void preparePhases() {
98                List<ResourceConsumption> resourceConsumptionList = new ArrayList<ResourceConsumption>();
99               
100                if(task.getExecution() == null || task.getExecution().getResourceConsumptionProfile() == null){
101                        ResourceConsumption resConsumption = null;
102                        try {
103                                resConsumption = new ResourceConsumption(this.length, getComputingResourceRequirements());
104                        } catch (NoSuchFieldException e) {
105                                // TODO Auto-generated catch block
106                                e.printStackTrace();
107                        }
108                        resourceConsumptionList.add(resConsumption);
109                }
110                else{
111                        for(ResourceConsumptionType resConsumption: task.getExecution().getResourceConsumptionProfile().getResourceConsumption()){
112                                ResourceConsumption resourceConsumption = new ResourceConsumption(resConsumption);
113                                resourceConsumptionList.add(resourceConsumption);
114                        }
115                }
116                this.resourceConsumptionProfile = new ResourceConsumptionProfile(resourceConsumptionList);
117               
118        //      System.out.println("======"+task.getExecution().getExecutable().getApplication().getName());
119        }
120       
121        public DateTime getExecutionStartTime() throws NoSuchFieldException {
122                if(this.startTime != null)
123                        return this.startTime;
124               
125                ExecutionTimeType execTime = this.task.getExecutionTime();
126                if(execTime == null)
127                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
128                                                                                + " task "+ getId() + " is not defined.");
129               
130                TimePeriod timePeriod = execTime.getTimePeriod();
131                if(timePeriod == null)
132                        throw new NoSuchFieldException("Time Period for job " + getJobId()
133                                        + " task "+ getId() + " is not defined.");
134               
135                this.startTime = new DateTime(timePeriod.getPeriodStart());
136                return this.startTime;
137        }
138
139        public DateTime getExecutionEndTime() throws NoSuchFieldException {
140                if(this.endTime != null)
141                        return this.endTime;
142               
143                ExecutionTimeType execTime = this.task.getExecutionTime();
144                if(execTime == null)
145                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
146                                                                                + " task "+ getId() + " is not defined.");
147               
148                TimePeriod timePeriod = execTime.getTimePeriod();
149                if(timePeriod == null)
150                        throw new NoSuchFieldException("Time Period for job " + getJobId()
151                                        + " task "+ getId() + " is not defined.");
152               
153                TimePeriodChoice periodChoice = timePeriod.getTimePeriodChoice();
154                if(periodChoice == null)
155                        throw new NoSuchFieldException("Period End and Period Duration for job " + getJobId()
156                                        + " task "+ getId() + " are not defined.");
157               
158                java.util.Date periodEnd = periodChoice.getPeriodEnd();
159                if(periodEnd != null) {
160                        this.endTime = new DateTime(periodEnd);
161                        return this.endTime;
162                }
163               
164                org.exolab.castor.types.Duration duration = periodChoice.getPeriodDuration();
165                if(duration == null)
166                        throw new NoSuchFieldException("Period Duration for job " + getJobId()
167                                        + " task "+ getId() + " is not defined.");
168               
169                DateTime periodStart = getExecutionStartTime();
170                MutableDateTime m_periodEnd = periodStart.toMutableDateTime();
171                m_periodEnd.add(duration.toLong());
172
173                this.endTime = m_periodEnd.toDateTime();
174                periodChoice.setPeriodDuration(null);
175                periodChoice.setPeriodEnd(this.endTime.toDate());
176               
177                return this.endTime;
178        }
179
180        public ReadableDuration getExpectedDuration() throws NoSuchFieldException {
181                if(this.duration != null)
182                        return this.duration;
183               
184                ExecutionTimeType execTime = this.task.getExecutionTime();
185                if(execTime == null)
186                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
187                                                                                + " task "+ getId() + " is not defined.");
188               
189                org.exolab.castor.types.Duration d = execTime.getExecutionDuration();
190                if(d == null)
191                        throw new NoSuchFieldException("Execution Duration for job " + getJobId()
192                                        + " task "+ getId() + " is not defined.");
193
194                this.duration = new Duration(d.toLong());
195                return this.duration;
196        }
197
198        public String getJobId() {
199                return this.task.getJobId();
200        }
201
202        public double getParameterDoubleValue(ResourceParameterName parameterName)
203                        throws NoSuchFieldException, IllegalArgumentException {
204               
205                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
206               
207                switch (name) {
208                        case APPLICATION:
209                        case CPUARCH:
210                        case HOSTNAME:
211                        case LOCALRESOURCEMANAGER:
212                        case OSNAME:
213                        case OSRELEASE:
214                        case OSTYPE:
215                        case OSVERSION:
216                        case REMOTESUBMISSIONINTERFACE:
217                                throw new IllegalArgumentException("For " + parameterName + " use getParameterStringValue() method.");
218                }
219
220                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
221               
222                double returnValue = 0;
223                boolean notFound = true;
224               
225                for(int i = 0; i < item.length && notFound; i++){
226                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
227                        if(hostParameter == null)
228                                continue;
229                       
230                        if(name == hostParameter.getName()) {
231                                returnValue = hostParameter.getParameterTypeChoice().getParameterTypeChoiceItem(0).getParameterValue().getContent();
232                                notFound = false;
233                        }
234                }
235               
236                if(notFound)
237                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
238                                        + " task "+ getId() + " is not defined.");
239               
240                return returnValue;
241        }
242
243        public String getParameterStringValue(ResourceParameterName parameterName)
244                        throws NoSuchFieldException, IllegalArgumentException {
245                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
246               
247                switch (name) {
248                        case CPUCOUNT:
249                        case GPUCOUNT:
250                        case CPUSPEED:
251                        case DISKSPACE:
252                        case FREECPUS:
253                        case FREEDISKSPACE:
254                        case FREEMEMORY:
255                        case MEMORY:
256                                throw new IllegalArgumentException("For " + parameterName + " use getParameterDoubleValue() method.");
257                }
258               
259                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
260               
261                String returnValue = null;
262                boolean notFound = true;
263               
264                for(int i = 0; i < item.length && notFound; i++){
265                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
266                        if(hostParameter == null)
267                                continue;
268                       
269                        if(name == hostParameter.getName()) {
270                                returnValue = hostParameter.getStringValue(0).getValue();
271                                notFound = false;
272                        }
273                }
274               
275                if(notFound)
276                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
277                                        + " task "+ getId() + " is not defined.");
278               
279                return returnValue;
280        }
281
282        public DateTime getSubmissionTimeToBroker() {
283                if(this.brokerSubmitTime != null)
284                        return this.brokerSubmitTime;
285               
286                this.brokerSubmitTime = new DateTime(this.task.getSubmissionTime());
287               
288                return this.brokerSubmitTime;
289        }
290
291        public String getId() {
292                return this.task.getTaskId();
293        }
294
295        public String getUserDN() {
296                return this.task.getUserDN();
297        }
298
299        public org.qcg.broker.schemas.resreqs.Task getDescription() {
300                return this.task;
301        }
302
303        public String getDocument() throws Exception {
304                StringWriter writer = new StringWriter();
305               
306                marshaller.marshal(this.task, writer);
307               
308                return writer.toString();
309        }
310       
311        protected ComputingResourceBaseTypeItem[] getComputingResourceRequirements() throws NoSuchFieldException{
312               
313                Requirements req = this.task.getRequirements();
314                if(req == null)
315                        throw new NoSuchFieldException("Requierements section for job " + getJobId()
316                                                                                + " task "+ getId() + " is not defined.");
317               
318                TaskResourceRequirements taskReq = req.getTaskResourceRequirements();
319                if(taskReq == null)
320                        throw new NoSuchFieldException("Task resource requirements section for job " + getJobId()
321                                        + " task "+ getId() + " is not defined.");
322               
323                ComputingResource computingResource = taskReq.getComputingResource(0);
324                if(computingResource == null)
325                        throw new NoSuchFieldException("Computing resource requirement for job " + getJobId()
326                                        + " task "+ getId() + " is not defined.");
327               
328                ComputingResourceBaseTypeItem item[] = computingResource.getComputingResourceBaseTypeItem();
329                if(item == null || item.length == 0)
330                        throw new NoSuchFieldException("Computing resource requirement is empty for job " + getJobId()
331                                        + " task "+ getId());
332       
333                return item;
334        }
335       
336        public void setValue(ResourceParameterName parameterName, Object newValue) throws NoSuchFieldException{
337                boolean notFound = true;
338               
339                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
340               
341                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
342               
343                for(int i = 0; i < item.length && notFound; i++){
344                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
345                        if(hostParameter == null)
346                                continue;
347                       
348                        if(name == hostParameter.getName()) {
349                                hostParameter.
350                                                getParameterTypeChoice().
351                                                getParameterTypeChoiceItem(0).
352                                                getParameterValue().
353                                                setContent(((Integer)newValue).doubleValue());
354                                notFound = false;
355                        }
356                }
357               
358                if(notFound)
359                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
360                                        + " task "+ getId() + " is not defined.");
361        }
362
363        public List<AbstractProcessesGroup> getProcessesGroups() {
364                return this.groups;
365        }
366       
367        public List<AbstractProcesses> getProcesses(){
368                return this.processes;
369        }
370       
371        public List<AbstractProcesses> getProcesses(AbstractProcessesGroup processGroup){
372                if(this.processes == null)
373                        return null;
374               
375                List<AbstractProcesses> ret = new ArrayList<AbstractProcesses>();
376               
377                for(int i = 0; i < processes.size(); i++){
378                        AbstractProcesses p = processes.get(i);
379                        if(p.belongsTo(processGroup))
380                                ret.add(p);
381                }
382               
383                return ret;
384        }
385       
386        protected void prepareTopology(){
387                if(this.task.getRequirements() == null)
388                        return;
389               
390                if(this.task.getRequirements().getTopologyCount() < 1)
391                        return;
392               
393                Topology topology = this.task.getRequirements().getTopology(0);
394               
395                if(topology.getGroupCount() > 0){
396                        this.groups = new ArrayList<AbstractProcessesGroup>(topology.getGroupCount());
397                }
398               
399                for(int i = 0; i < topology.getGroupCount(); i++){
400                        this.groups.add(new ProcessesGroup(topology.getGroup(i)));
401                }
402
403                if(topology.getProcessesCount() > 0){
404                        this.processes = new ArrayList<AbstractProcesses>(topology.getProcessesCount());
405                }
406
407                for(int i = 0; i < topology.getProcessesCount(); i++){
408                        org.qcg.broker.schemas.resreqs.Processes p = topology.getProcesses(i);
409                        if(p.getProcessesResourceRequirements() == null){
410                                TaskResourceRequirements trr = this.task.getRequirements().getTaskResourceRequirements();
411                                if(trr != null) {
412                                        ProcessesResourceRequirements prr = new ProcessesResourceRequirements();
413                                       
414                                        for(int cridx = 0; cridx < trr.getComputingResourceCount(); cridx++){
415                                                ComputingResourceExtType cre = new ComputingResourceExtType();
416                                                ComputingResource cr = trr.getComputingResource(cridx);
417                                               
418                                                for(int j = 0; j < cr.getComputingResourceBaseTypeItemCount(); j++){
419                                                        cre.addComputingResourceBaseTypeItem(cr.getComputingResourceBaseTypeItem(j));
420                                                }
421                                               
422                                                prr.addComputingResource(cre);
423                                        }
424                                       
425                                        p.setProcessesResourceRequirements(prr);
426                                }
427                        }
428                       
429                        this.processes.add(new Processes(p));
430                }
431        }
432
433        public double getCpuCntRequest() throws NoSuchFieldException {
434                return getParameterDoubleValue(ResourceParameterName.CPUCOUNT);
435        }
436
437        public double getMemoryRequest() throws NoSuchFieldException {
438                return getParameterDoubleValue(ResourceParameterName.MEMORY);
439        }
440       
441        public long getLength() {
442                return this.length;
443        }
444
445        public int getStatus() {
446                return this.status;
447        }
448
449        public void setLength(long length) {
450                this.length = length;
451                preparePhases();
452        }
453
454        public void setStatus(int status){
455                this.status = status;
456        }
457       
458        public void setSenderId(int id){
459                this.senderId = id;
460        }
461       
462        public int getSenderId(){
463                return this.senderId;
464        }
465       
466        public boolean isFinished(){
467                if(processes == null)
468                        return (status > 3 && status <= 6);
469               
470                for(int i = 0; i < processes.size(); i++){
471                        if(!processes.get(i).isFinished())
472                                return false;
473                }
474               
475                return true;
476        }
477       
478        public long getWorkloadLogWaitTime() {
479                return workloadLogWaitTime;
480        }
481
482        public void setWorkloadLogWaitTime(long waitTime) {
483                this.workloadLogWaitTime = waitTime;
484        }
485       
486        /*public void addToResPath(String resName){
487                if(resPathHistory == null)
488                        resPathHistory = new String();
489                resPathHistory = new StringBuffer(resPathHistory).append(resName).append("_").toString();
490
491        }
492       
493        public String getResPath(){
494                return this.resPathHistory;
495
496        }*/
497
498
499        @Override
500        public int getUserId() {
501                // TODO Auto-generated method stub
502                return 0;
503        }
504
505        public boolean isRegistered() {
506                return isRegistered;
507        }
508
509        public void register(JobRegistryImpl jobRegistry) {
510                isRegistered = true;
511        }
512
513        public void accept(WorkloadUnitHandler wuh) {
514                wuh.handleTask(this);
515        }
516       
517        public ResourceConsumptionProfile getResourceConsumptionProfile(){
518                return resourceConsumptionProfile;
519        }
520       
521        public String getApplicationName(){
522                return task.getExecution().getExecutable().getApplication().getName();
523        }
524}
Note: See TracBrowser for help on using the repository browser.