source: DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 481

Revision 481, 20.8 KB checked in by wojtekp, 13 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import eduni.simjava.Sim_event;
4import eduni.simjava.Sim_system;
5import gridsim.Accumulator;
6import gridsim.ResourceCalendar;
7import gridsim.gssim.DCWormsTags;
8import gridsim.gssim.filter.ExecTaskFilter;
9import gssim.schedframe.scheduling.ExecTask;
10import gssim.schedframe.scheduling.Executable;
11
12import java.util.ArrayList;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17
18import org.apache.commons.lang.ArrayUtils;
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeUtilsExt;
23import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.ResourceController;
27import schedframe.events.scheduling.SchedulingEvent;
28import schedframe.events.scheduling.SchedulingEventType;
29import schedframe.events.scheduling.StartTaskExecutionEvent;
30import schedframe.events.scheduling.TaskFinishedEvent;
31import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
32import schedframe.exceptions.ResourceException;
33import schedframe.resources.computing.ComputingResource;
34import schedframe.resources.computing.profiles.energy.EnergyEvent;
35import schedframe.resources.computing.profiles.energy.EnergyEventType;
36import schedframe.resources.units.PEUnit;
37import schedframe.resources.units.ProcessingElements;
38import schedframe.resources.units.ResourceUnit;
39import schedframe.resources.units.ResourceUnitName;
40import schedframe.resources.units.StandardResourceUnitName;
41import schedframe.scheduling.ResourceHistoryItem;
42import schedframe.scheduling.Scheduler;
43import schedframe.scheduling.TaskListImpl;
44import schedframe.scheduling.UsedResourceList;
45import schedframe.scheduling.WorkloadUnitHandler;
46import schedframe.scheduling.manager.resources.LocalResourceManager;
47import schedframe.scheduling.manager.resources.ManagedResources;
48import schedframe.scheduling.manager.resources.ResourceManager;
49import schedframe.scheduling.plan.AllocationInterface;
50import schedframe.scheduling.plan.ScheduledTaskInterface;
51import schedframe.scheduling.plan.SchedulingPlanInterface;
52import schedframe.scheduling.plugin.SchedulingPlugin;
53import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
54import schedframe.scheduling.plugin.grid.ModuleListImpl;
55import schedframe.scheduling.plugin.grid.ModuleType;
56import schedframe.scheduling.policy.AbstractManagementSystem;
57import schedframe.scheduling.queue.TaskQueueList;
58import schedframe.scheduling.tasks.AbstractProcesses;
59import schedframe.scheduling.tasks.Job;
60import schedframe.scheduling.tasks.JobInterface;
61import schedframe.scheduling.tasks.Task;
62import schedframe.scheduling.tasks.TaskInterface;
63import schedframe.scheduling.tasks.WorkloadUnit;
64import simulator.DCWormsConstants;
65import simulator.utils.DoubleMath;
66
67public class LocalManagementSystem extends AbstractManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        protected Accumulator accTotalLoad;
74
75        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
76                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
77                        throws Exception {
78
79                super(providerId, entityName, execTimeEstimationPlugin, queues);
80
81                //schedulingPlugin = (LocalSchedulingPlugin) InstanceFactory.createInstance(schedulingPluginClassName, LocalSchedulingPlugin.class);
82               
83                if (schedPlugin == null) {
84                        throw new Exception("Can not create local scheduling plugin instance.");
85                }
86                this.schedulingPlugin =  schedPlugin;
87                accTotalLoad = new Accumulator();
88                moduleList = new ModuleListImpl(1);
89
90        }
91
92        public void init(Scheduler sched, ManagedResources managedResources) {
93                super.init(sched, managedResources);
94                //scheduler = sched;
95                //resourceManager = ResourceManagerFactory.createResourceManager(scheduler);
96                double load = 0;
97                accTotalLoad.add(load);
98        }
99
100        public void processEvent(Sim_event ev) {
101
102                updateProcessingProgress();
103
104                int tag = ev.get_tag();
105                Object obj;
106
107                switch (tag) {
108
109                case DCWormsTags.TIMER:
110                        if (pluginSupportsEvent(tag)) {
111                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
112                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
113                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
114                                executeSchedulingPlan(decision);
115                        }
116                        sendTimerEvent();
117
118                        break;
119
120                case DCWormsTags.TASK_READY_FOR_EXECUTION:
121                       
122                        ExecTask data = (ExecTask) ev.get_data();
123                        try {
124                                data.setStatus(DCWormsTags.READY);
125                                if (pluginSupportsEvent(tag)) {
126                                        SchedulingEvent event = new StartTaskExecutionEvent(data.getJobId(), data.getId());
127                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
128                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
129                                        executeSchedulingPlan(decision);
130                                }
131                        } catch (Exception e) {
132                                e.printStackTrace();
133                        }
134                        break;
135
136                case DCWormsTags.TASK_EXECUTION_FINISHED:
137                        obj = ev.get_data();
138                        ExecTask exec = (ExecTask) obj;
139                        if (exec.getStatus() == DCWormsTags.INEXEC) {
140                                finalizeExecutable(exec);
141
142                                sendFinishedWorkloadUnit(exec);
143                                //task.setGridletStatus(Gridlet.SUCCESS);
144                                //task.finalizeGridlet();
145                                log.debug(exec.getJobId() + "_" + exec.getId() + " finished execution on " + new DateTime());
146                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
147                                /*UsedResourceList<ResourceHistoryItem> lastUsedList = task.getUsedResources();
148                                Map<ResourceUnitName, AbstractResourceUnit> lastUsed = lastUsedList.getLast()
149                                                .getResourceUnits();
150                                getAllocationManager().freeResources(lastUsed);
151                                ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PROCESSINGELEMENTS);
152                                for (ComputingResource resource : pes) {
153                                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, task));
154                                }
155                                SubTaskFilter filter = new SubTaskFilter(task.getGridletID(), GssimTags.TASK_REQUESTED_TIME_EXPIRED);
156                                scheduler.sim_cancel(filter, null);
157                                super.sendFinishJob((Executable) task.getGridlet());*/
158                        }
159                        if (pluginSupportsEvent(tag)) {
160                                SchedulingEvent event = new TaskFinishedEvent(exec.getJobId(), exec.getId());
161                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
162                                                queues, getJobRegistry(), getResourceManager(), moduleList);
163                                executeSchedulingPlan(decision);
164                        }
165                        Job job = jobRegistry.getJob(exec.getJobId());
166                        if(!job.isFinished()){
167                                getWorkloadUnitHandler().handleJob(job);
168                        }
169
170                        break;
171                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
172                        obj = ev.get_data();
173                        exec = (Executable) obj;
174                        if (pluginSupportsEvent(tag)) {
175                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(exec.getJobId(), exec.getId());
176                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
177                                                queues, getJobRegistry(), getResourceManager(), moduleList);
178                                executeSchedulingPlan(decision);
179                        }
180
181                        break;
182                case DCWormsTags.UPDATE:
183                        updateProcessingTimes(ev);
184                        break;
185                }
186        }
187       
188
189        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
190                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
191                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
192                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
193                                        queues, getJobRegistry(), getResourceManager(), moduleList);
194                        executeSchedulingPlan(decision);
195                }
196                //if(scheduler.getParent() != null){
197                        sendFinishedWorkloadUnit(wu);
198                //}
199        }
200       
201        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
202
203                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
204                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
205                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
206
207                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
208                                continue;
209                        }
210
211                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
212
213                        TaskInterface<?> task = taskDecision.getTask();
214                        for (int j = 0; j < allocations.size(); j++) {
215
216                                AllocationInterface<?> allocation = allocations.get(j);
217                                if (allocation.isProcessing()) {
218                                        ExecTask exec = (ExecTask) task;                                       
219                                        executeTask(exec, allocation.getRequestedResources());
220                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
221                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
222                                        submitTask(task, allocation);
223                                } else {
224                                        ExecTask exec = (ExecTask) task;
225                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), (ExecTask)task));
226                                }
227                        }
228
229                }
230        }
231
232        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
233
234                Executable exec = (Executable)task;
235                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
236                if(allocationStatus == false)
237                        return;
238                removeFromQueue(task);
239                //double completionPercentage = (submittedTask.getLength() - submittedTask.getRemainingGridletLength())/submittedTask.getLength();
240                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
241                int time = Double.valueOf(
242                                execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue();
243                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
244                                + " will finish after " + time);
245
246                if (time < 0.0)
247                        return;
248
249                exec.setEstimatedDuration(time);
250                DateTime currentTime = new DateTime();
251                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
252                exec.addUsedResources(resHistItem);
253
254                scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED,
255                                exec);
256
257                try {
258                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
259                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
260                } catch (NoSuchFieldException e) {
261                        double t = exec.getEstimatedDuration();
262                        scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
263                }
264               
265                try {
266                        exec.setStatus(DCWormsTags.INEXEC);
267                } catch (Exception e1) {
268                        // TODO Auto-generated catch block
269                        e1.printStackTrace();
270                }
271                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
272               
273                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
274                if(peUnit instanceof ProcessingElements){
275                        ProcessingElements pes = (ProcessingElements) peUnit;
276                        for (ComputingResource resource : pes) {
277                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
278                        }
279                } else {
280                        ComputingResource resource = null;
281                        try {
282                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
283                        } catch (ResourceException e) {
284                               
285                        }
286                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
287                }
288                /*ProcessingElements pes = (ProcessingElements) choosenResources.get(StandardResourceUnitName.PE);
289                for (ComputingResource resource : pes) {
290                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, submittedTask));
291                }*/
292
293                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
294                        System.out.println(etask.getJobId());
295                        for(String taskId: etask.getVisitedResources())
296                                System.out.println("====="+taskId);
297                }*/
298        }
299       
300        public void finalizeExecutable(ExecTask execTask){
301               
302                Executable exec = (Executable)execTask;
303                try {
304                        exec.setStatus(DCWormsTags.SUCCESS);
305                } catch (Exception e1) {
306                        // TODO Auto-generated catch block
307                        e1.printStackTrace();
308                }
309                exec.finalizeExecutable();
310                UsedResourceList<ResourceHistoryItem> lastUsedList = exec.getUsedResources();
311                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
312                                .getResourceUnits();
313                getAllocationManager().freeResources(lastUsed);
314               
315                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
316                if(peUnit instanceof ProcessingElements){
317                        ProcessingElements pes = (ProcessingElements) peUnit;
318                        for (ComputingResource resource : pes) {
319                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
320                        }
321                } else {
322                        ComputingResource resource = null;
323                        try {
324                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
325                        } catch (ResourceException e) {
326                               
327                        }
328                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
329                }
330                /*ProcessingElements pes = (ProcessingElements) lastUsed.get(StandardResourceUnitName.PE);
331                for (ComputingResource resource : pes) {
332                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, subTask));
333                }*/
334                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
335                scheduler.sim_cancel(filter, null);
336               
337
338                Job job = jobRegistry.getJob(exec.getJobId());
339
340                Task task = null;
341                try {
342                        task = job.getTask(exec.getTaskId());
343                } catch (NoSuchFieldException e) {
344                        e.printStackTrace();
345                }
346                if(exec.getProcessesId() == null){
347                        try {
348                                task.setStatus(exec.getStatus());
349                        } catch (Exception e) {
350                                e.printStackTrace();
351                        }
352                } else {
353                        List<AbstractProcesses> processesList = task.getProcesses();
354                        for(int i = 0; i < processesList.size(); i++){
355                                AbstractProcesses processes = processesList.get(i);
356                                if(processes.getId().equals(exec.getProcessesId())){
357                                        processes.setStatus(exec.getStatus());
358                                        break;
359                                }
360                        }
361                }
362                //sendFinishedWorkloadUnit(executable);
363        }
364       
365        protected void updateProcessingProgress() {
366                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
367                if (timeSpan <= 0.0) {
368                        // don't update when nothing changed
369                        return;
370                }
371                lastUpdateTime = Sim_system.clock();
372                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
373                while (iter.hasNext()) {
374                        ExecTask task = iter.next();
375                        Executable exec = (Executable)task;
376                        UsedResourceList<ResourceHistoryItem> usedResourcesList = exec.getUsedResources();
377                        ResourceUnit unit = usedResourcesList.getLast().getResourceUnits()
378                                        .get(StandardResourceUnitName.PE);
379
380                        double load = getMIShare(timeSpan, (PEUnit) unit);
381                        exec.setCompletionPercentage(100 * timeSpan/exec.getEstimatedDuration());
382                        addTotalLoad(load);
383                }
384        }
385
386        private double getMIShare(double timeSpan, PEUnit pes) {
387                double localLoad;
388                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
389                if (resCalendar == null)
390                        localLoad = 0;
391                else
392                        // 1 - localLoad_ = available MI share percentage
393                        localLoad = resCalendar.getCurrentLoad();
394
395                int speed = pes.getSpeed();
396                int cnt = pes.getAmount();
397
398                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
399                return totalMI;
400        }
401
402        protected void updateProcessingTimes(Sim_event ev) {
403                updateProcessingProgress();
404                for (ExecTask task : jobRegistry.getRunningTasks()) {
405                        Executable exec = (Executable)task;
406                        List<String> visitedResource = exec.getVisitedResources();
407                        String originResource = ev.get_data().toString();
408                        if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){
409                                continue;
410                        }
411                       
412                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
413                        //double completionPercentage = (task.getLength() - subTask.getRemainingGridletLength())/task.getLength();
414                        double time = execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
415                                        task, choosenResources, exec.getCompletionPercentage());
416
417                        /*if(!subTask.getVisitedResources().contains(ev.get_data().toString())) {
418                                continue;
419                        }*/
420                        //check if the new estimated end time is equal to the previous one; if yes the continue without update
421                        if( DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
422                                continue;
423                        }
424                        ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
425                        scheduler.sim_cancel(filter, null);
426                        scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, task);
427
428                }
429        }       
430
431        public double calculateTotalLoad(int size) {
432                // background load, defined during initialization
433                double load;
434                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
435                if (resCalendar == null)
436                        load = 0;
437                else
438                        load = resCalendar.getCurrentLoad();
439
440                int numberOfPE = 0;
441                try {
442                        for(ResourceUnit resUnit : getResourceManager().getPE()){
443                                numberOfPE = numberOfPE + resUnit.getAmount();
444                        }
445                        //numberOfPE = getResourceManager().getPE().size();
446                } catch (Exception e) {
447                        numberOfPE = 1;
448                }
449                double tasksPerPE = (double) size / numberOfPE;
450                load += Math.min(1.0 - load, tasksPerPE);
451
452                return load;
453        }
454
455        public Accumulator getTotalLoad() {
456                return accTotalLoad;
457        }
458
459        protected void addTotalLoad(double load) {
460                accTotalLoad.add(load);
461        }
462       
463        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
464                        ExecTask task) {
465
466                ResourceManager resourceManager = this.resourceManager;
467                if(resourceName != null){
468                        ComputingResource resource = null;
469                        try {
470                                resource = resourceManager.getResourceByName(resourceName);
471                        } catch (ResourceException e) {
472                                return null;
473                        }
474
475                        resourceManager = new LocalResourceManager(resource);
476                }
477                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
478
479
480                int cpuRequest;
481                try {
482                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
483                } catch (NoSuchFieldException e) {
484                        cpuRequest = 1;
485                }
486
487                //PEUnit processingUnits = null;
488                if (cpuRequest != 0) {
489                       
490                        List<ResourceUnit> availableUnits = null;
491                        try {
492                                availableUnits = getResourceManager().getPE();
493                        } catch (ResourceException e) {
494                                return null;
495                        }
496                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
497
498                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
499                                PEUnit peUnit = (PEUnit) availableUnits .get(i);
500                                if(peUnit.getFreeAmount() > 0){
501                                        int  allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
502                                        cpuRequest = cpuRequest - allocPE;
503                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
504                                }       
505                        }
506                       
507                        if(cpuRequest > 0){
508                                return null;
509                        }
510
511                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
512                }
513
514                return  map;
515        }
516       
517        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
518                updateProcessingProgress();
519                registerWorkloadUnit(wu);
520        }
521
522        private void registerWorkloadUnit(WorkloadUnit wu){
523                if(!wu.isRegistered()){
524                        wu.register(jobRegistry);
525                }
526                wu.accept(getWorkloadUnitHandler());
527        }
528       
529        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
530               
531                public void handleJob(Job job){
532
533                        if (log.isInfoEnabled())
534                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
535
536                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
537                        jobsList.add(job);
538                        TaskListImpl readyTasks = new TaskListImpl();
539                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
540                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
541                                readyTasks.add(task);
542                        }
543
544                        for(WorkloadUnit e:readyTasks){
545                                registerWorkloadUnit(e);
546                        }
547                }
548               
549                public void handleTask(TaskInterface<?> ti){
550                        Task task = (Task)ti;
551                        List<AbstractProcesses> processes = task.getProcesses();
552
553                        if(processes == null || processes.size() == 0){
554                                Executable exec = new Executable(task);
555                                registerWorkloadUnit(exec);
556                        } else {
557                                for(int j = 0; j < processes.size(); j++){
558                                        AbstractProcesses procesesSet = processes.get(j);
559                                        Executable exec = new Executable(task, procesesSet);
560                                        registerWorkloadUnit(exec);
561                                }
562                        }
563                }
564               
565                public void handleExecutable(ExecTask task){
566                        Executable exec = (Executable) task;
567
568                        // int cost =
569                        // this.resourceManager.getResourceCharacteristic().getResUnits() !=
570                        // null ?
571                        // this.resourceManager.getResourceCharacteristic().getResUnits().get(ResourceParameterName.COST).getAmount()
572                        // : 1;
573
574                        exec.visitResource(scheduler.get_name());
575                        Scheduler parentScheduler = scheduler.getParent();
576                        while (parentScheduler != null && !exec.getVisitedResources().contains(parentScheduler.get_name())) {
577                                exec.visitResource(parentScheduler.get_name());
578                                parentScheduler = parentScheduler.getParent();
579                        }
580                       
581                        exec.setSchedulerName(scheduler.get_id());
582                        jobRegistry.addExecTask(exec);
583                        TaskListImpl newTasks = new TaskListImpl();
584                        newTasks.add(exec);
585               
586                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
587
588                        if (exec.getStatus() == DCWormsTags.QUEUED) {
589                                sendExecutableReadyEvent(exec);
590                        }
591                }
592        }
593
594        public WorkloadUnitHandler getWorkloadUnitHandler() {
595                return new LocalWorkloadUnitHandler();
596        }
597       
598       
599        public LocalResourceManager getResourceManager() {
600                if (resourceManager instanceof ResourceManager)
601                        return (LocalResourceManager) resourceManager;
602                else
603                        return null;
604        }
605       
606}
Note: See TracBrowser for help on using the repository browser.