source: xssim/src/gridsim/gssim/policy/ARAllocationPolicy.java @ 104

Revision 104, 23.3 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package gridsim.gssim.policy;
2
3import eduni.simjava.Sim_event;
4import gridsim.GridSimTags;
5import gridsim.Gridlet;
6import gridsim.gssim.GssimConstants;
7import gridsim.gssim.GssimTags;
8import gridsim.gssim.ResourceHistoryItem;
9import gridsim.gssim.SubmittedTask;
10import gssim.schedframe.scheduling.AbstractExecutable;
11import gssim.schedframe.scheduling.ExecTaskInterface;
12import gssim.schedframe.scheduling.Executable;
13import gssim.schedframe.scheduling.plugin.local.GssimTimeOperations;
14import gssim.schedframe.scheduling.plugin.local.LocalReservationManagerImpl;
15
16import java.util.ArrayList;
17import java.util.Collection;
18import java.util.HashMap;
19import java.util.List;
20import java.util.Map;
21
22import org.apache.commons.logging.Log;
23import org.apache.commons.logging.LogFactory;
24import org.joda.time.DateTime;
25import org.joda.time.DateTimeUtilsExt;
26
27import schedframe.exceptions.ReservationException;
28import schedframe.resources.ExecutingResourceDescription;
29import schedframe.resources.ResourceProvider;
30import schedframe.resources.units.Processors;
31import schedframe.resources.units.ResourceUnit;
32import schedframe.scheduling.AbstractResourceRequirements;
33import schedframe.scheduling.AbstractTimeRequirements;
34import schedframe.scheduling.Offer;
35import schedframe.scheduling.Queue;
36import schedframe.scheduling.Reservation;
37import schedframe.scheduling.ResourceUsage;
38import schedframe.scheduling.TaskInterface;
39import schedframe.scheduling.TimeResourceAllocation;
40import schedframe.scheduling.events.ReservationActiveEvent;
41import schedframe.scheduling.events.SchedulingEvent;
42import schedframe.scheduling.events.SchedulingEventType;
43import schedframe.scheduling.events.SchedulingResponseType;
44import schedframe.scheduling.events.StartTaskExecutionEvent;
45import schedframe.scheduling.events.TaskRequestedTimeExpiredEvent;
46import schedframe.scheduling.plugin.SchedulingPluginConfiguration;
47import schedframe.scheduling.plugin.local.LocalSchedulingARPlugin;
48import schedframe.scheduling.plugin.local.ResourceAllocationInterface;
49import schedframe.scheduling.utils.ResourceParameterName;
50
51/**
52 *
53 * @author Marcin Krystek
54 *
55 */
56public class ARAllocationPolicy extends AbstractAllocationPolicy {
57
58        private Log log = LogFactory.getLog(ARAllocationPolicy.class);
59       
60        protected LocalSchedulingARPlugin arSchedulingPlugin;
61       
62        protected LocalReservationManagerImpl reservationManager;
63       
64        protected HashMap<String, SubmittedTask> finishedTasks;
65       
66        public ARAllocationPolicy(ResourceProvider provider, String entityName,
67                                                                String schedulingPluginClassName,
68                                                                String execTimeEstimationPluginClassName,
69                                                                ExecutingResourceDescription resourceDescription)
70                                                                throws Exception {
71               
72                super(provider, entityName,
73                                schedulingPluginClassName,
74                                execTimeEstimationPluginClassName,
75                                resourceDescription);
76               
77                this.arSchedulingPlugin = (LocalSchedulingARPlugin) this.schedulingPlugin;
78                this.reservationManager = new LocalReservationManagerImpl(0, new GssimTimeOperations(), this.provider);
79                this.finishedTasks = new HashMap<String, SubmittedTask>();
80        }
81       
82       
83       
84        public List<Offer> getOffer(AbstractTimeRequirements<?> timeRequirements,
85                                                                AbstractResourceRequirements<?> resourceRequirements)
86                                                                throws ReservationException{
87                updateProcessingTimes();
88                List<Offer> result = null;
89               
90                result = arSchedulingPlugin.getOffers(timeRequirements, resourceRequirements,
91                                                                                        inExecution, queues,
92                                                                                        getResourceUnitsManager(),
93                                                                                        reservationManager);
94                return result;
95        }
96
97       
98        public Reservation createReservation(AbstractTimeRequirements<?> timeRequirements,
99                                                                AbstractResourceRequirements<?> resourceRequirements)
100                                                                throws ReservationException{
101                updateProcessingTimes();
102                Reservation result = null;
103               
104                result = arSchedulingPlugin.createReservation(timeRequirements, resourceRequirements,
105                                                                                                        inExecution,
106                                                                                                        queues,
107                                                                                                        getResourceUnitsManager(),
108                                                                                                        reservationManager);
109               
110                return result;
111               
112        }
113       
114       
115        public List<Reservation> createReservation(ResourceUsage resourceUsage)
116                                                                throws ReservationException{
117                updateProcessingTimes();
118               
119                List<Reservation> result = null;
120               
121                result = arSchedulingPlugin.createReservation(resourceUsage,
122                                                                                                        inExecution,
123                                                                                                        queues,
124                                                                                                        getResourceUnitsManager(),
125                                                                                                        reservationManager);
126               
127               
128               
129                return result;
130        }
131       
132       
133        public Reservation commitReservation(Reservation reservation)
134                                                                throws ReservationException {
135                updateProcessingTimes();
136               
137                Reservation result = null;
138               
139                result = arSchedulingPlugin.commitReservation(reservation,
140                                                                                                        inExecution,
141                                                                                                        queues,
142                                                                                                        getResourceUnitsManager(),
143                                                                                                        reservationManager);
144               
145/*              if("23".equals(reservation.getJobId())){
146                        int a = 1;
147                }
148                Processors processors;
149                try {
150                        processors = (Processors) reservation.getAllocatedResource().
151                                                                                                        getResourceUnit(ResourceParameterName.CPUCOUNT);
152                        if(processors != null){
153                                Processors choosenResources = getResourceUnitsManager().chooseProcessors(processors);
154                                getResourceUnitsManager().createResourceReservation(reservation, choosenResources);
155                        } else {
156                                log.error("SOMETHING IS WRONG HERE. " +
157                                                "THERE SHOULD BE SOME RESOURCE REQUIREMENT IN THIS RESERVATION.\n" + reservation);
158                        }
159                       
160                } catch (NoSuchFieldException e) {
161                        e.printStackTrace();
162                }
163*/             
164                return result;
165               
166        }
167       
168        public Reservation commitReservation(Reservation reservation,
169                                                                TimeResourceAllocation resourceUsage)
170                                                                throws ReservationException {
171                updateProcessingTimes();
172               
173                Reservation result = null;
174               
175                result = arSchedulingPlugin.commitReservation(reservation,
176                                                                                                        resourceUsage,
177                                                                                                        inExecution,
178                                                                                                        queues,
179                                                                                                        getResourceUnitsManager(),
180                                                                                                        reservationManager);
181               
182                return result;
183        }
184       
185        public void modifyReservation(Reservation reservation,
186                                                                TimeResourceAllocation resourceUsage)
187                                                                throws ReservationException {
188               
189                updateProcessingTimes();
190               
191                arSchedulingPlugin.modifyReservation(reservation, resourceUsage,
192                                                                                        inExecution, queues,
193                                                                                        getResourceUnitsManager(),
194                                                                                        reservationManager);
195               
196        }
197       
198        public void cancelReservation(Reservation reservation)
199                                                                throws ReservationException {
200                updateProcessingTimes();
201               
202                arSchedulingPlugin.cancelReservation(reservation, inExecution, queues, reservationManager);
203        }
204       
205        public Reservation.Status checkStatus(Reservation reservation)
206                                                                throws ReservationException{
207                updateProcessingTimes();
208               
209                Reservation.Status status = arSchedulingPlugin.getStatus(reservation, reservationManager);
210               
211                return status;
212        }
213       
214
215        public void gridletSubmit(Gridlet gridlet, boolean ack) {
216                if(gridlet instanceof AbstractExecutable){
217                        AbstractExecutable gl = (AbstractExecutable) gridlet;
218                        updateProcessingTimes();
219                        SubmittedTask mrg = new SubmittedTask(gl);
220                        newTasks.add(mrg);
221                        try {
222                                arSchedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceUnitsManager(), reservationManager);
223                //              arSchedulingPlugin.schedule(new SchedulingEvent(SchedulingEventType.TASK_ARRIVED),
224                //                                                                      inExecution,
225                //                                                                      queues,
226                //                                                                      getResourceUnitsManager(),
227                //                                                                      reservationManager);
228                               
229                        } catch (ReservationException e) {
230                                sendException(gridlet.getUserID(), e);
231                        }
232                       
233                        submitedGridlets.put(new Integer(mrg.getGridletID()), mrg);
234        //              replyGridletSubmit(true, gl.getGridletID(), gl.getUserID());
235                }
236        }
237
238       
239       
240        protected void sendException(int dest, Exception e){
241               
242        }
243       
244        public void processOtherEvent(Sim_event ev){
245                updateProcessingTimes();
246               
247                int tag = ev.get_tag();
248                Object obj;
249               
250                switch(tag){
251               
252                        case GssimTags.TIMER:
253                               
254                                if(pluginSupportsEvent(tag)){
255                                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
256                                        arSchedulingPlugin.schedule(event,
257                                                                                                inExecution,
258                                                                                                queues,
259                                                                                                getResourceUnitsManager(),
260                                                                                                reservationManager);
261                                       
262                                        arSchedulingPlugin.schedule(event,
263                                                        inExecution,
264                                                        queues,
265                                                        getResourceUnitsManager());
266                                }
267                               
268                                SchedulingPluginConfiguration config = schedulingPlugin.getConfiguration();
269                                if(config != null){
270                                        Map<SchedulingEventType, Object> events = config.getServedEvents();
271                                        if(events != null){
272                                                int delay = (Integer) events.get(SchedulingEventType.TIMER);
273                                                this.timerHandler.notify(delay, null);
274                                        }
275                                }
276                                break;
277                               
278                        case GssimTags.TASK_READY_FOR_EXECUTION:
279                                Executable data = (Executable) ev.get_data();
280                                try {
281                                        data.setGridletStatus(Gridlet.READY);
282                                       
283                                /*      Reservation.Status r_stat = this.reservationManager.getReservStatus(data.getReservationId());
284                                        if(r_stat == Reservation.Status.ACTIVE){
285                                                ResourceUnitsManagerImpl manager = getResourceUnitsManager();
286                                                if(!manager.areResourcesReserved(data)){
287                                                        Map<ResourceParameterName, ResourceUnit> resources = manager.chooseResourcesFor(data);
288                                                        if(resources != null)
289                                                                manager.createResourceReservation(data, resources);
290                                                }
291                                        }
292                                */     
293                                        if(pluginSupportsEvent(tag)){
294                                                SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
295                                                arSchedulingPlugin.schedule(event,
296                                                                                                        inExecution,
297                                                                                                        queues,
298                                                                                                        getResourceUnitsManager(),
299                                                                                                        reservationManager);
300                                               
301                                                arSchedulingPlugin.schedule(event,
302                                                                inExecution,
303                                                                queues,
304                                                                getResourceUnitsManager());
305                                               
306                                        }
307                                } catch (Exception e) {
308                                        e.printStackTrace();
309                                }
310                               
311                                break;
312                       
313                        case GssimTags.TASK_EXECUTION_FINISHED:
314                                obj = ev.get_data();
315                                SubmittedTask t = (SubmittedTask) obj;
316                               
317                                if(t.getStatus() == Gridlet.INEXEC){
318                                        t.setGridletStatus(Gridlet.SUCCESS);
319                                        t.finalizeGridlet();
320                                        inExecution.remove(t);
321                //                      this.finishedTasks.put(t.getJobId()+"_"+t.getId(), t);
322                                       
323                                        super.sendFinishGridlet(t.getGridlet());
324                                        if(pluginSupportsEvent(tag)) {
325                                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
326                                                arSchedulingPlugin.schedule(event,
327                                                                                                        inExecution,
328                                                                                                        queues,
329                                                                                                        getResourceUnitsManager(),
330                                                                                                        reservationManager);
331                                               
332                                                arSchedulingPlugin.schedule(event,
333                                                                inExecution,
334                                                                queues,
335                                                                getResourceUnitsManager());
336                                        }
337                                }
338                               
339                                break;
340                               
341                        case GridSimTags.AR_STATUS_ACTIVE:
342                                obj = ev.get_data();
343                                String resId = (String) obj;
344                                List<Reservation> list = reservationManager.getReservations();
345                                for(int i = 0; i < list.size(); i++){
346                                        Reservation r = list.get(i);
347                                        if(r.getId().equals(resId)){
348                                                r.setStatus(Reservation.Status.ACTIVE);
349                                                long delay = (r.getEndMillis() - DateTimeUtilsExt.currentTimeMillis()) / 1000;
350                                                log.info("reservation for " + r.getJobId() + " ends after: " + delay);
351                                //              ExecTaskInterface<?> task = findTask(r, queues);
352
353                                                // task may not be submitted to the resource yet. This may happen when task submit time
354                                                // and reservation start time are equal
355                                //              if(task != null){
356                                //                      Map<ResourceParameterName, ResourceUnit> choosenResources = getResourceUnitsManager().chooseResourcesFor(task);
357                                //                      getResourceUnitsManager().createResourceReservation(task, choosenResources);
358                                //              }
359                                               
360                                                this.reservationCompletedHandler.notify(Long.valueOf(delay).doubleValue(), r);
361                                                if(pluginSupportsEvent(tag))
362                                                        arSchedulingPlugin.schedule(new ReservationActiveEvent(r),
363                                                                                                        inExecution,
364                                                                                                        queues,
365                                                                                                        getResourceUnitsManager(),
366                                                                                                        reservationManager);
367                                               
368                                               
369                                                break;
370                                        }
371                                }
372
373                                break;
374                       
375                        case GridSimTags.AR_STATUS_EXPIRED:
376                                log.error("Implement reservation expiration handler");
377                                break;
378
379                        case GridSimTags.AR_STATUS_COMPLETED:
380                                Reservation reservation = (Reservation) ev.get_data();
381                                reservation.setStatus(Reservation.Status.FINISHED);
382                                log.debug("finishing reservation: " + reservation);
383                                String jt = reservation.getJobId() + "_" + reservation.getTaskId();
384        /*                      SubmittedTask task = this.finishedTasks.get(jt);
385                               
386                                if(task != null){
387                                        List<Map<ResourceParameterName, ResourceUnit>> lastUsed = task.getUsedResources();
388                                        getResourceUnitsManager().removeResourceReservation(lastUsed.get(lastUsed.size() - 1));
389                                        this.finishedTasks.remove(jt);
390                                        return;
391                                }
392        */                     
393                                int index;
394                                for(index = 0; index < inExecution.size(); index++){
395                                        SubmittedTask st = inExecution.get(index);
396                                       
397                                        if(st.getReservationID() == Integer.valueOf(reservation.getId()).intValue()){
398                                               
399                                                Integer gridletId = Integer.valueOf(st.getGridletID());
400                                                Map<String, Object> gridletHistory = history.get(gridletId);
401                                                DateTime expectedEndTime = (DateTime) gridletHistory.get(GssimConstants.END_TIME);
402                                                DateTime currentTime = new DateTime();
403                                                if(expectedEndTime.isAfter(currentTime)){
404                                                        st.setGridletStatus(Gridlet.CANCELED);
405                                                        st.finalizeGridlet();
406                                                        inExecution.remove(index);
407                                                        gridletHistory.put(GssimConstants.END_TIME, currentTime);
408                                                        log.debug("stop task execution: " + jt);
409                                                        super.sendCancelGridlet(GridSimTags.GRIDLET_CANCEL, st.getGridlet(), gridletId, st.getUserID());
410                                                        List<ResourceHistoryItem> lastUsed = st.getUsedResources();
411                                                        getResourceUnitsManager().removeResourceReservation(lastUsed.get(lastUsed.size() - 1).getResourceUnits());
412                                                        return;
413                                                }
414                                        }
415                                }
416                               
417                                       
418                               
419                                Queue<TaskInterface<?>> queue = (Queue<TaskInterface<?>>) queues.get(0);
420                                String jobId = reservation.getJobId();
421                                String taskId = reservation.getTaskId();
422                               
423                                for(int i = 0; i < queue.size(); i++){
424                                        TaskInterface<?> queuedTask = queue.get(i);
425                                        if(jobId.equals(queuedTask.getJobId()) && taskId.equals(queuedTask.getId())){
426                                                SubmittedTask st = (SubmittedTask) queuedTask;
427                                                st.setGridletStatus(Gridlet.FAILED);
428                                                st.finalizeGridlet();
429                                                queue.remove(i);
430                                                log.debug("task didnt event start. remove from queue");
431                                                super.sendCancelGridlet(GridSimTags.GRIDLET_CANCEL, st.getGridlet(), st.getGridletID(), st.getUserID());
432                                                break;
433                                        }
434                                }
435                       
436                                break;
437                        case GssimTags.TASK_REQUESTED_TIME_EXPIRED:
438                                obj = ev.get_data();
439                                t = (SubmittedTask) obj;
440                                        if(pluginSupportsEvent(tag)) {
441                                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(t.getJobId(), t.getId());
442                                                SchedulingResponseType responseEvent = arSchedulingPlugin.handleResourceAllocationViolation(event,
443                                                                                                        inExecution,
444                                                                                                        queues,
445                                                                                                        getResourceUnitsManager(),
446                                                                                                        reservationManager);
447                                                if(responseEvent == SchedulingResponseType.TERMINATE_TASK){
448                                                        if(t.getStatus() == Gridlet.INEXEC){
449                                                                t.setGridletStatus(Gridlet.SUCCESS);
450                                                                t.finalizeGridlet();
451                                                                inExecution.remove(t);
452                                                                Integer gridletId = Integer.valueOf(t.getGridletID());
453                                                                Map<String, Object> gridletHistory = history.get(gridletId);
454                                                                DateTime currentTime = new DateTime();
455                                                                gridletHistory.put(GssimConstants.END_TIME, currentTime);
456                                                                super.sendFinishGridlet(t.getGridlet());
457                                                        }
458                                                }
459                                                else if(responseEvent == SchedulingResponseType.KILL_TASK){
460                                                        if(t.getStatus() == Gridlet.INEXEC){
461                                                                t.setGridletStatus(Gridlet.CANCELED);
462                                                                t.finalizeGridlet();
463                                                                inExecution.remove(t);
464                                                                Integer gridletId = Integer.valueOf(t.getGridletID());
465                                                                Map<String, Object> gridletHistory = history.get(gridletId);
466                                                                DateTime currentTime = new DateTime();
467                                                                gridletHistory.put(GssimConstants.END_TIME, currentTime);
468                                                                log.debug("stop task execution: " + t.getJobId() + "_" + t.getId());
469                                                                super.sendCancelGridlet(GridSimTags.GRIDLET_CANCEL, t.getGridlet(), gridletId, t.getUserID());
470                                                        }
471                                                }
472                                                else if(responseEvent == SchedulingResponseType.STOP_AND_RESUME_FROM_CHECKPOINT){
473                                                        if(t.getStatus() == Gridlet.INEXEC){
474                                                                gridletPause(t.getGridletID(), t.getUserID(), false);
475                                                                gridletResume(t.getGridletID(), t.getUserID(), false);
476                                                                requestedTimeExpiredHandler.cancel(t);
477                                                               
478                                                        }
479                                                }
480                                                else if(responseEvent == SchedulingResponseType.ONE_HOUR_GRACE_PERIOD){
481                                                        if(t.getStatus() == Gridlet.INEXEC){
482                                                                requestedTimeExpiredHandler.notify(3600, t);
483                                                                return;
484                                                        }
485                                                }
486                                                else if(responseEvent == SchedulingResponseType.EXECUTE_ANYWAY){
487                                                        if(t.getStatus() == Gridlet.INEXEC){
488                                                                return;
489                                                        }
490                                                }
491                                                if(pluginSupportsEvent(GssimTags.TASK_EXECUTION_FINISHED)){
492                                                        SchedulingEvent newEvent = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
493                                                        arSchedulingPlugin.schedule(newEvent,
494                                                                                                                inExecution,
495                                                                                                                queues,
496                                                                                                                getResourceUnitsManager(),
497                                                                                                                reservationManager);
498                                                        arSchedulingPlugin.schedule(newEvent,
499                                                                        inExecution,
500                                                                        queues,
501                                                                        getResourceUnitsManager());
502                                                       
503                                                }
504
505                                        }
506                                       
507                                break;
508                }
509        }
510       
511        public Collection<Reservation> getReservations(){
512                return this.reservationManager.getReservations();
513        }
514       
515        protected ExecTaskInterface<?> findTask(Reservation r, List<Queue<? extends TaskInterface<?>>> queues){
516                ExecTaskInterface<?> task = null;
517               
518                for(int i = 0; i < queues.size() && task == null; i++){
519                        Queue<? extends TaskInterface<?>> queue = queues.get(i);
520                        for(int j = 0; j < queue.size() && task == null; j++){
521                                TaskInterface<?> t = queue.get(j);
522                                if(t.getJobId().equals(r.getJobId()) &&
523                                        t.getId().equals(r.getTaskId())){
524                                        task = (ExecTaskInterface<?>)t;
525                                }
526                        }
527                }
528               
529                return task;
530        }
531       
532        public long execute(TaskInterface<?> task){
533               
534                SubmittedTask rgl = (SubmittedTask) task;
535
536                boolean slotIsFree = checkFreeSlot(task);
537                if(slotIsFree == false){
538                        return -1;
539                }
540               
541                ResourceAllocationInterface resUnitsManager = getAllocationManager();
542                Map<ResourceParameterName, ResourceUnit> choosenResources = null;
543               
544                choosenResources = schedulingPlugin.chooseResourcesFor(rgl, getResourceUnitsManager());
545               
546                if (choosenResources == null) {
547                        return -1;
548                }
549
550                resUnitsManager.allocateResources(choosenResources);
551
552                double taskLength = rgl.getRemainingGridletLength();
553                int time = Double.valueOf(
554                                forecastFinishTimePlugin.execTimeEstimation(choosenResources, task, taskLength)).intValue();
555
556                if(time < 0.0)
557                        return time;
558               
559                rgl.setEstimatedDuration(time);
560               
561                DateTime currentTime = new DateTime();
562                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
563                rgl.addUsedResources(resHistItem);
564               
565                Map<String, Object> historyItem = new HashMap<String, Object>();
566                List<ResourceHistoryItem> list = new ArrayList<ResourceHistoryItem>(1);
567                list.add(resHistItem);
568                historyItem.put(GssimConstants.RESOURCES, list);
569                historyItem.put(GssimConstants.START_TIME, currentTime);
570                currentTime = currentTime.plusSeconds(time);
571                historyItem.put(GssimConstants.END_TIME, currentTime);
572
573                rgl.setFinishTime(currentTime.getMillis()/1000);
574                finishTaskHandler.notify(time, rgl);
575
576                try {
577                        long expectedDuration = rgl.getExpectedDuration().getMillis()/1000;
578                        requestedTimeExpiredHandler.notify(expectedDuration, rgl);
579                } catch (NoSuchFieldException e) {
580                        log.warn(e.getMessage());
581                }
582               
583                //save history
584                history.put(Integer.valueOf(rgl.getGridletID()), historyItem);
585               
586                return time;
587        }
588       
589        private boolean checkFreeSlot(TaskInterface<?> task){
590               
591                SubmittedTask submittedTask = (SubmittedTask) task;
592                if(submittedTask.getReservationID() != -1)
593                        return true;
594               
595                long expectedRuntime = getExpectedRuntime(task);
596                if(expectedRuntime < 0)
597                        return false;
598               
599                int cpuCnt = 0;
600                try {
601                        cpuCnt = Double.valueOf(submittedTask.getCpuCntRequest()).intValue();
602                } catch (NoSuchFieldException e) {
603                        cpuCnt = 1;
604                }
605               
606                Processors processors = new Processors(this.provider.getProviderId(), this.getResourceUnitsManager().getProcessors().getAmount(), cpuCnt);
607                DateTime startTime = new DateTime();
608                DateTime endTime = startTime.plusSeconds(Long.valueOf(expectedRuntime).intValue());
609
610                List<TimeResourceAllocation> usageList = null;
611               
612                try {
613                        usageList = reservationManager.resourceUsageList(processors, inExecution, startTime, endTime);
614                        long reqDuration = (long)expectedRuntime * 1000;
615                        long duration = reqDuration;
616                        int reqCpu =  cpuCnt;
617                        DateTime start = null;
618                        for(int i = 0; i < usageList.size() && duration > 0; i++){
619                                TimeResourceAllocation allocation = usageList.get(i);
620                                ResourceUnit unit = allocation.getAllocatedResource().
621                                                                                                getResourceUnit(ResourceParameterName.CPUCOUNT);
622                                if(unit.getFreeAmount() >= reqCpu){
623                                        duration = duration - (allocation.getEndMillis() - allocation.getStartMillis());
624                                        if(start == null)
625                                                start = allocation.getStart();
626                                } else {
627                                        duration = reqDuration;
628                                        start = null;
629                                }
630                        }
631                        if(duration > 0){
632                                return false;
633                        }
634                } catch (NoSuchFieldException e) {
635                        return false;
636                }
637
638                return true;
639        }
640       
641        public long getExpectedRuntime(TaskInterface<?> task){
642
643                SubmittedTask submittedTask = (SubmittedTask) task;
644                long expectedDuration;
645                try {
646                        expectedDuration = submittedTask.getExpectedDuration().getMillis()/1000;
647                } catch (NoSuchFieldException e) {
648                        //log.warn(e.getMessage());
649                        // assume that task without defined requested time will not be performed;
650                        return -1;
651                        //expectedDuration = execTimeEstimation(task);
652                        //if executing time for a given task is not defined then assume that it
653                        //can be performed anyway ant set expected duration to 0
654                        //expectedDuration = 0;
655                }
656                long expectedRuntime = -1;
657                if(task.getStatus() ==  Gridlet.READY)
658                        expectedRuntime = expectedDuration;
659                else if(task.getStatus() ==  Gridlet.INEXEC)
660                {       
661                        double execStartTime = submittedTask.getExecStartTime();
662                        long currentTimeMillis = new DateTime().getMillis()/1000;
663                        expectedRuntime = (Double.valueOf(execStartTime).longValue() + expectedDuration - currentTimeMillis);
664                }
665                return expectedRuntime;
666        }
667
668        public void gridletPause(int gridletId, int userId, boolean ack) {
669                updateProcessingTimes();
670                SubmittedTask submittedTask = submitedGridlets.get(new Integer(gridletId));
671                if (submittedTask == null) {
672                        if(log.isInfoEnabled())
673                                log.info(super.getName() + ".gridletPause(): Cannot " + "find Gridlet #" + gridletId + " for User #" + userId);
674                        return;
675                } else if (submittedTask.getGridletStatus() == Gridlet.INEXEC) {
676                        submittedTask.setGridletStatus(Gridlet.PAUSED);
677                        inExecution.remove(submittedTask);
678                        finishTaskHandler.cancel(submittedTask);
679                }               
680        }
681       
682        public void gridletResume(int gridletId, int userId, boolean ack) {
683                updateProcessingTimes();
684                Integer key = new Integer(gridletId);
685                SubmittedTask submittedTask = submitedGridlets.get(key);
686               
687                if (submittedTask != null && submittedTask.getGridletStatus() == Gridlet.PAUSED) {
688                        //the gridlet has status paused
689                        newTasks.add(submittedTask);
690                        try {
691                                arSchedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceUnitsManager(), reservationManager);
692                        } catch (ReservationException e) {
693                                sendException(userId, e);
694                        }
695                        if(pluginSupportsEvent(GssimTags.GRIDLET_RESUME))
696                                submittedTask.setGridletStatus(Gridlet.READY);
697                                arSchedulingPlugin.schedule(new SchedulingEvent(SchedulingEventType.TASK_FINISHED),
698                                                                                inExecution, queues, getResourceUnitsManager(), reservationManager);
699                                arSchedulingPlugin.schedule(new SchedulingEvent(SchedulingEventType.TASK_FINISHED),
700                                                                                        inExecution, queues, getResourceUnitsManager());
701                } else {
702                        if(log.isInfoEnabled())
703                                log.info(super.getName() + ".gridletResume(): Cannot " + "find paused Gridlet #" + gridletId + " for User #" + userId);
704                }
705        }
706}
Note: See TracBrowser for help on using the repository browser.