source: DCWoRMS/trunk/src/schedframe/scheduling/tasks/Task.java @ 481

Revision 481, 14.3 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.tasks;
2
3import org.qcg.broker.schemas.resreqs.ComputingResource;
4import org.qcg.broker.schemas.resreqs.ComputingResourceBaseTypeItem;
5import org.qcg.broker.schemas.resreqs.ComputingResourceExtType;
6import org.qcg.broker.schemas.resreqs.ComputingResourceParameterType;
7import org.qcg.broker.schemas.resreqs.ExecutionTimeType;
8import org.qcg.broker.schemas.resreqs.ProcessesResourceRequirements;
9import org.qcg.broker.schemas.resreqs.Requirements;
10import org.qcg.broker.schemas.resreqs.TaskResourceRequirements;
11import org.qcg.broker.schemas.resreqs.TimePeriod;
12import org.qcg.broker.schemas.resreqs.TimePeriodChoice;
13import org.qcg.broker.schemas.resreqs.Topology;
14import org.qcg.broker.schemas.resreqs.types.ComputingResourceParameterTypeNameType;
15
16import java.io.StringReader;
17import java.io.StringWriter;
18import java.util.ArrayList;
19import java.util.List;
20
21import org.exolab.castor.xml.Marshaller;
22import org.exolab.castor.xml.ResolverException;
23import org.exolab.castor.xml.Unmarshaller;
24import org.exolab.castor.xml.XMLContext;
25import org.joda.time.DateTime;
26import org.joda.time.Duration;
27import org.joda.time.MutableDateTime;
28import org.joda.time.ReadableDuration;
29
30import schedframe.scheduling.WorkloadUnitHandler;
31import schedframe.scheduling.manager.tasks.JobRegistryImpl;
32import schedframe.scheduling.tasks.requirements.ResourceParameterName;
33
34/**
35 *
36 * @author Marcin Krystek
37 *
38 */
39public class Task implements TaskInterface<org.qcg.broker.schemas.resreqs.Task> {
40       
41        protected static Unmarshaller unmarshaller;
42        protected static Marshaller marshaller;
43        protected boolean isRegistered;
44       
45        static {
46                XMLContext context = new XMLContext();
47                try {
48                        context.addClass(org.qcg.broker.schemas.resreqs.Task.class);
49                        unmarshaller = context.createUnmarshaller();
50                        marshaller = context.createMarshaller();
51                } catch (ResolverException e) {
52                        e.printStackTrace();
53                        unmarshaller = null;
54                        marshaller = null;
55                }
56        }
57       
58        protected org.qcg.broker.schemas.resreqs.Task task;
59        /*
60         * The values for following variables are obtained from native Task
61         * object. This should significantly speed up access to task details.
62         */
63        private DateTime startTime;
64        private DateTime endTime;
65        private DateTime brokerSubmitTime;
66        private ReadableDuration duration;
67        private List<AbstractProcessesGroup> groups;
68        private List<AbstractProcesses> processes;
69        private long length;
70        private int status;
71        private int senderId;
72        private long workloadLogWaitTime;
73
74        public Task(org.qcg.broker.schemas.resreqs.Task task){
75                this.task = task;
76                this.startTime = null;
77                this.endTime = null;
78                this.brokerSubmitTime = null;
79                this.duration = null;
80        //      this.gridletID_ = (getJobId() + "_" + getId()).hashCode();
81                prepareTopology();
82        }
83       
84        public Task(String task) throws Exception{
85                StringReader reader = new StringReader(task);
86                this.task = (org.qcg.broker.schemas.resreqs.Task) unmarshaller.unmarshal(reader);
87                this.startTime = null;
88                this.endTime = null;
89                this.brokerSubmitTime = null;
90                this.duration = null;
91        //      this.gridletID_ = (getJobId() + getId()).hashCode();
92                prepareTopology();
93        }
94
95        public DateTime getExecutionStartTime() throws NoSuchFieldException {
96                if(this.startTime != null)
97                        return this.startTime;
98               
99                ExecutionTimeType execTime = this.task.getExecutionTime();
100                if(execTime == null)
101                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
102                                                                                + " task "+ getId() + " is not defined.");
103               
104                TimePeriod timePeriod = execTime.getTimePeriod();
105                if(timePeriod == null)
106                        throw new NoSuchFieldException("Time Period for job " + getJobId()
107                                        + " task "+ getId() + " is not defined.");
108               
109                this.startTime = new DateTime(timePeriod.getPeriodStart());
110                return this.startTime;
111        }
112
113        public DateTime getExecutionEndTime() throws NoSuchFieldException {
114                if(this.endTime != null)
115                        return this.endTime;
116               
117                ExecutionTimeType execTime = this.task.getExecutionTime();
118                if(execTime == null)
119                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
120                                                                                + " task "+ getId() + " is not defined.");
121               
122                TimePeriod timePeriod = execTime.getTimePeriod();
123                if(timePeriod == null)
124                        throw new NoSuchFieldException("Time Period for job " + getJobId()
125                                        + " task "+ getId() + " is not defined.");
126               
127                TimePeriodChoice periodChoice = timePeriod.getTimePeriodChoice();
128                if(periodChoice == null)
129                        throw new NoSuchFieldException("Period End and Period Duration for job " + getJobId()
130                                        + " task "+ getId() + " are not defined.");
131               
132                java.util.Date periodEnd = periodChoice.getPeriodEnd();
133                if(periodEnd != null) {
134                        this.endTime = new DateTime(periodEnd);
135                        return this.endTime;
136                }
137               
138                org.exolab.castor.types.Duration duration = periodChoice.getPeriodDuration();
139                if(duration == null)
140                        throw new NoSuchFieldException("Period Duration for job " + getJobId()
141                                        + " task "+ getId() + " is not defined.");
142               
143                DateTime periodStart = getExecutionStartTime();
144                MutableDateTime m_periodEnd = periodStart.toMutableDateTime();
145                m_periodEnd.add(duration.toLong());
146
147                this.endTime = m_periodEnd.toDateTime();
148                periodChoice.setPeriodDuration(null);
149                periodChoice.setPeriodEnd(this.endTime.toDate());
150               
151                return this.endTime;
152        }
153
154        public ReadableDuration getExpectedDuration() throws NoSuchFieldException {
155                if(this.duration != null)
156                        return this.duration;
157               
158                ExecutionTimeType execTime = this.task.getExecutionTime();
159                if(execTime == null)
160                        throw new NoSuchFieldException("Execution Time for job " + getJobId()
161                                                                                + " task "+ getId() + " is not defined.");
162               
163                org.exolab.castor.types.Duration d = execTime.getExecutionDuration();
164                if(d == null)
165                        throw new NoSuchFieldException("Execution Duration for job " + getJobId()
166                                        + " task "+ getId() + " is not defined.");
167
168                this.duration = new Duration(d.toLong());
169                return this.duration;
170        }
171
172        public String getJobId() {
173                return this.task.getJobId();
174        }
175
176        public double getParameterDoubleValue(ResourceParameterName parameterName)
177                        throws NoSuchFieldException, IllegalArgumentException {
178               
179                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
180               
181                switch (name) {
182                        case APPLICATION:
183                        case CPUARCH:
184                        case HOSTNAME:
185                        case LOCALRESOURCEMANAGER:
186                        case OSNAME:
187                        case OSRELEASE:
188                        case OSTYPE:
189                        case OSVERSION:
190                        case REMOTESUBMISSIONINTERFACE:
191                                throw new IllegalArgumentException("For " + parameterName + " use getParameterStringValue() method.");
192                }
193
194                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
195               
196                double returnValue = 0;
197                boolean notFound = true;
198               
199                for(int i = 0; i < item.length && notFound; i++){
200                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
201                        if(hostParameter == null)
202                                continue;
203                       
204                        if(name == hostParameter.getName()) {
205                                returnValue = hostParameter.getParameterTypeChoice().getParameterTypeChoiceItem(0).getParameterValue().getContent();
206                                notFound = false;
207                        }
208                }
209               
210                if(notFound)
211                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
212                                        + " task "+ getId() + " is not defined.");
213               
214                return returnValue;
215        }
216
217        public String getParameterStringValue(ResourceParameterName parameterName)
218                        throws NoSuchFieldException, IllegalArgumentException {
219                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
220               
221                switch (name) {
222                        case CPUCOUNT:
223                        case GPUCOUNT:
224                        case CPUSPEED:
225                        case DISKSPACE:
226                        case FREECPUS:
227                        case FREEDISKSPACE:
228                        case FREEMEMORY:
229                        case MEMORY:
230                                throw new IllegalArgumentException("For " + parameterName + " use getParameterDoubleValue() method.");
231                }
232               
233                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
234               
235                String returnValue = null;
236                boolean notFound = true;
237               
238                for(int i = 0; i < item.length && notFound; i++){
239                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
240                        if(hostParameter == null)
241                                continue;
242                       
243                        if(name == hostParameter.getName()) {
244                                returnValue = hostParameter.getStringValue(0).getValue();
245                                notFound = false;
246                        }
247                }
248               
249                if(notFound)
250                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
251                                        + " task "+ getId() + " is not defined.");
252               
253                return returnValue;
254        }
255
256        public DateTime getSubmissionTimeToBroker() {
257                if(this.brokerSubmitTime != null)
258                        return this.brokerSubmitTime;
259               
260                this.brokerSubmitTime = new DateTime(this.task.getSubmissionTime());
261               
262                return this.brokerSubmitTime;
263        }
264
265        public String getId() {
266                return this.task.getTaskId();
267        }
268
269        public String getUserDN() {
270                return this.task.getUserDN();
271        }
272
273        public org.qcg.broker.schemas.resreqs.Task getDescription() {
274                return this.task;
275        }
276
277        public String getDocument() throws Exception {
278                StringWriter writer = new StringWriter();
279               
280                marshaller.marshal(this.task, writer);
281               
282                return writer.toString();
283        }
284       
285        protected ComputingResourceBaseTypeItem[] getComputingResourceRequirements() throws NoSuchFieldException{
286               
287                Requirements req = this.task.getRequirements();
288                if(req == null)
289                        throw new NoSuchFieldException("Requierements section for job " + getJobId()
290                                                                                + " task "+ getId() + " is not defined.");
291               
292                TaskResourceRequirements taskReq = req.getTaskResourceRequirements();
293                if(taskReq == null)
294                        throw new NoSuchFieldException("Task resource requirements section for job " + getJobId()
295                                        + " task "+ getId() + " is not defined.");
296               
297                ComputingResource computingResource = taskReq.getComputingResource(0);
298                if(computingResource == null)
299                        throw new NoSuchFieldException("Computing resource requirement for job " + getJobId()
300                                        + " task "+ getId() + " is not defined.");
301               
302                ComputingResourceBaseTypeItem item[] = computingResource.getComputingResourceBaseTypeItem();
303                if(item == null || item.length == 0)
304                        throw new NoSuchFieldException("Computing resource requirement is empty for job " + getJobId()
305                                        + " task "+ getId());
306       
307                return item;
308        }
309       
310        public void setValue(ResourceParameterName parameterName, Object newValue) throws NoSuchFieldException{
311                boolean notFound = true;
312               
313                ComputingResourceParameterTypeNameType name = ComputingResourceParameterTypeNameType.valueOf(parameterName.value().toUpperCase());
314               
315                ComputingResourceBaseTypeItem item[] = getComputingResourceRequirements();
316               
317                for(int i = 0; i < item.length && notFound; i++){
318                        ComputingResourceParameterType hostParameter = item[i].getHostParameter();
319                        if(hostParameter == null)
320                                continue;
321                       
322                        if(name == hostParameter.getName()) {
323                                hostParameter.
324                                                getParameterTypeChoice().
325                                                getParameterTypeChoiceItem(0).
326                                                getParameterValue().
327                                                setContent(((Integer)newValue).doubleValue());
328                                notFound = false;
329                        }
330                }
331               
332                if(notFound)
333                        throw new NoSuchFieldException(parameterName + " for job " + getJobId()
334                                        + " task "+ getId() + " is not defined.");
335        }
336
337        public List<AbstractProcessesGroup> getProcessesGroups() {
338                return this.groups;
339        }
340       
341        public List<AbstractProcesses> getProcesses(){
342                return this.processes;
343        }
344       
345        public List<AbstractProcesses> getProcesses(AbstractProcessesGroup processGroup){
346                if(this.processes == null)
347                        return null;
348               
349                List<AbstractProcesses> ret = new ArrayList<AbstractProcesses>();
350               
351                for(int i = 0; i < processes.size(); i++){
352                        AbstractProcesses p = processes.get(i);
353                        if(p.belongsTo(processGroup))
354                                ret.add(p);
355                }
356               
357                return ret;
358        }
359       
360        protected void prepareTopology(){
361                if(this.task.getRequirements() == null)
362                        return;
363               
364                if(this.task.getRequirements().getTopologyCount() < 1)
365                        return;
366               
367                Topology topology = this.task.getRequirements().getTopology(0);
368               
369                if(topology.getGroupCount() > 0){
370                        this.groups = new ArrayList<AbstractProcessesGroup>(topology.getGroupCount());
371                }
372               
373                for(int i = 0; i < topology.getGroupCount(); i++){
374                        this.groups.add(new ProcessesGroup(topology.getGroup(i)));
375                }
376
377                if(topology.getProcessesCount() > 0){
378                        this.processes = new ArrayList<AbstractProcesses>(topology.getProcessesCount());
379                }
380
381                for(int i = 0; i < topology.getProcessesCount(); i++){
382                        org.qcg.broker.schemas.resreqs.Processes p = topology.getProcesses(i);
383                        if(p.getProcessesResourceRequirements() == null){
384                                TaskResourceRequirements trr = this.task.getRequirements().getTaskResourceRequirements();
385                                if(trr != null) {
386                                        ProcessesResourceRequirements prr = new ProcessesResourceRequirements();
387                                       
388                                        for(int cridx = 0; cridx < trr.getComputingResourceCount(); cridx++){
389                                                ComputingResourceExtType cre = new ComputingResourceExtType();
390                                                ComputingResource cr = trr.getComputingResource(cridx);
391                                               
392                                                for(int j = 0; j < cr.getComputingResourceBaseTypeItemCount(); j++){
393                                                        cre.addComputingResourceBaseTypeItem(cr.getComputingResourceBaseTypeItem(j));
394                                                }
395                                               
396                                                prr.addComputingResource(cre);
397                                        }
398                                       
399                                        p.setProcessesResourceRequirements(prr);
400                                }
401                        }
402                       
403                        this.processes.add(new Processes(p));
404                }
405        }
406
407        public double getCpuCntRequest() throws NoSuchFieldException {
408                return getParameterDoubleValue(ResourceParameterName.CPUCOUNT);
409        }
410
411        public double getMemoryRequest() throws NoSuchFieldException {
412                return getParameterDoubleValue(ResourceParameterName.MEMORY);
413        }
414       
415        public long getLength() {
416                return this.length;
417        }
418
419        public int getStatus() {
420                return this.status;
421        }
422
423        public void setLength(long length) {
424                this.length = length;
425        }
426
427        public void setStatus(int status){
428                this.status = status;
429        }
430       
431        public void setSenderId(int id){
432                this.senderId = id;
433        }
434       
435        public int getSenderId(){
436                return this.senderId;
437        }
438       
439        public boolean isFinished(){
440                if(processes == null)
441                        return (status > 3 && status <= 6);
442               
443                for(int i = 0; i < processes.size(); i++){
444                        if(!processes.get(i).isFinished())
445                                return false;
446                }
447               
448                return true;
449        }
450       
451        public long getWorkloadLogWaitTime() {
452                return workloadLogWaitTime;
453        }
454
455        public void setWorkloadLogWaitTime(long waitTime) {
456                this.workloadLogWaitTime = waitTime;
457        }
458       
459        /*public void addToResPath(String resName){
460                if(resPathHistory == null)
461                        resPathHistory = new String();
462                resPathHistory = new StringBuffer(resPathHistory).append(resName).append("_").toString();
463
464        }
465       
466        public String getResPath(){
467                return this.resPathHistory;
468
469        }*/
470
471
472        @Override
473        public int getUserId() {
474                // TODO Auto-generated method stub
475                return 0;
476        }
477
478        public boolean isRegistered() {
479                return isRegistered;
480        }
481
482        public void register(JobRegistryImpl jobRegistry) {
483                isRegistered = true;
484        }
485
486        public void accept(WorkloadUnitHandler wuh) {
487                wuh.handleTask(this);
488        }
489}
Note: See TracBrowser for help on using the repository browser.