source: DCWoRMS/trunk/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 767

Revision 767, 20.2 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.Accumulator;
8import gridsim.ResourceCalendar;
9import gridsim.dcworms.DCWormsTags;
10import gridsim.dcworms.filter.ExecTaskFilter;
11
12import java.util.ArrayList;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17
18import org.apache.commons.lang.ArrayUtils;
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeUtilsExt;
23import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.ResourceController;
27import schedframe.events.scheduling.SchedulingEvent;
28import schedframe.events.scheduling.SchedulingEventType;
29import schedframe.events.scheduling.StartTaskExecutionEvent;
30import schedframe.events.scheduling.TaskFinishedEvent;
31import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
32import schedframe.exceptions.ResourceException;
33import schedframe.resources.computing.ComputingResource;
34import schedframe.resources.computing.profiles.energy.EnergyEvent;
35import schedframe.resources.computing.profiles.energy.EnergyEventType;
36import schedframe.resources.units.PEUnit;
37import schedframe.resources.units.ProcessingElements;
38import schedframe.resources.units.ResourceUnit;
39import schedframe.resources.units.ResourceUnitName;
40import schedframe.resources.units.StandardResourceUnitName;
41import schedframe.scheduling.ResourceHistoryItem;
42import schedframe.scheduling.Scheduler;
43import schedframe.scheduling.TaskList;
44import schedframe.scheduling.TaskListImpl;
45import schedframe.scheduling.UsedResourcesList;
46import schedframe.scheduling.WorkloadUnitHandler;
47import schedframe.scheduling.manager.resources.LocalResourceManager;
48import schedframe.scheduling.manager.resources.ManagedResources;
49import schedframe.scheduling.manager.resources.ResourceManager;
50import schedframe.scheduling.plan.AllocationInterface;
51import schedframe.scheduling.plan.ScheduledTaskInterface;
52import schedframe.scheduling.plan.SchedulingPlanInterface;
53import schedframe.scheduling.plugin.SchedulingPlugin;
54import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
55import schedframe.scheduling.plugin.grid.ModuleListImpl;
56import schedframe.scheduling.plugin.grid.ModuleType;
57import schedframe.scheduling.policy.AbstractManagementSystem;
58import schedframe.scheduling.queue.TaskQueueList;
59import schedframe.scheduling.tasks.AbstractProcesses;
60import schedframe.scheduling.tasks.Job;
61import schedframe.scheduling.tasks.JobInterface;
62import schedframe.scheduling.tasks.Task;
63import schedframe.scheduling.tasks.TaskInterface;
64import schedframe.scheduling.tasks.WorkloadUnit;
65import simulator.DCWormsConstants;
66import simulator.utils.DoubleMath;
67
68public class LocalManagementSystem extends AbstractManagementSystem {
69
70        private Log log = LogFactory.getLog(LocalManagementSystem.class);
71
72        protected double lastUpdateTime;
73
74        protected Accumulator accTotalLoad;
75
76        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
77                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
78                        throws Exception {
79
80                super(providerId, entityName, execTimeEstimationPlugin, queues);
81               
82                if (schedPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85                this.schedulingPlugin =  schedPlugin;
86                this.moduleList = new ModuleListImpl(1);
87               
88                this.accTotalLoad = new Accumulator();
89        }
90
91        public void init(Scheduler sched, ManagedResources managedResources) {
92                super.init(sched, managedResources);
93                double load = 0;
94                accTotalLoad.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102                Object obj;
103
104                switch (tag) {
105
106                case DCWormsTags.TIMER:
107                        if (pluginSupportsEvent(tag)) {
108                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
109                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
110                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
111                                executeSchedulingPlan(decision);
112                        }
113                        sendTimerEvent();
114                        break;
115
116                case DCWormsTags.TASK_READY_FOR_EXECUTION:
117                        ExecTask execTask = (ExecTask) ev.get_data();
118                        try {
119                                execTask.setStatus(DCWormsTags.READY);
120                                if (pluginSupportsEvent(tag)) {
121                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
122                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
123                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
124                                        executeSchedulingPlan(decision);
125                                }
126                        } catch (Exception e) {
127                                e.printStackTrace();
128                        }
129                        break;
130
131                case DCWormsTags.TASK_EXECUTION_FINISHED:
132                        obj = ev.get_data();
133                        execTask = (ExecTask) obj;
134                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
135                               
136                                finalizeExecutable(execTask);
137                                sendFinishedWorkloadUnit(execTask);
138                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
139                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
140                                if (pluginSupportsEvent(tag)) {
141                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
142                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
143                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
144                                        executeSchedulingPlan(decision);
145                                }
146                        }
147
148                        Job job = jobRegistry.getJob(execTask.getJobId());
149                        if(!job.isFinished()){
150                                getWorkloadUnitHandler().handleJob(job);
151                        }
152                        break;
153                       
154                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
155                        obj = ev.get_data();
156                        execTask = (Executable) obj;
157                        if (pluginSupportsEvent(tag)) {
158                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
159                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
160                                                queues, getJobRegistry(), getResourceManager(), moduleList);
161                                executeSchedulingPlan(decision);
162                        }
163                        break;
164                       
165                case DCWormsTags.UPDATE:
166                        updateProcessingTimes(ev);
167                        break;
168                }
169        }
170
171        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
172                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
173                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
174                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
175                                        queues, getJobRegistry(), getResourceManager(), moduleList);
176                        executeSchedulingPlan(decision);
177                }
178                //if(scheduler.getParent() != null){
179                        sendFinishedWorkloadUnit(wu);
180                //}
181        }
182       
183        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
184
185                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
186                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
187                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
188
189                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
190                                continue;
191                        }
192
193                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
194
195                        TaskInterface<?> task = taskDecision.getTask();
196                        for (int j = 0; j < allocations.size(); j++) {
197
198                                AllocationInterface<?> allocation = allocations.get(j);
199                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
200                                        ExecTask exec = (ExecTask) task;                                       
201                                        executeTask(exec, allocation.getRequestedResources());
202                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
203                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
204                                        submitTask(task, allocation);
205                                } else {
206                                        ExecTask exec = (ExecTask) task;
207                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
208                                }
209                        }
210                }
211        }
212
213        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
214
215                Executable exec = (Executable)task;
216                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
217                if(allocationStatus == false){
218                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is availiable at this moment.");
219                        return;
220                }
221
222                removeFromQueue(task);
223
224                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.START_TASK_EXECUTION);
225                int time = Double.valueOf(
226                                execTimeEstimationPlugin.execTimeEstimation(event, task, choosenResources, exec.getCompletionPercentage())).intValue();
227                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime()
228                                + " will finish after " + time);
229
230                if (time < 0.0)
231                        return;
232
233                exec.setEstimatedDuration(time);
234                DateTime currentTime = new DateTime();
235                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
236                exec.addUsedResources(resHistItem);
237                try {
238                        exec.setStatus(DCWormsTags.INEXEC);
239                } catch (Exception e) {
240                        // TODO Auto-generated catch block
241                        e.printStackTrace();
242                }
243                scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
244
245                try {
246                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
247                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
248                } catch (NoSuchFieldException e) {
249                        double t = exec.getEstimatedDuration();
250                        scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
251                }
252
253                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
254               
255                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
256               
257                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
258               
259                /*if(peUnit instanceof ProcessingElements){
260                        ProcessingElements pes = (ProcessingElements) peUnit;
261                        for (ComputingResource resource : pes) {
262                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec));
263                        }
264                } else {
265                        ComputingResource resource = null;
266                        try {
267                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
268                        } catch (ResourceException e) {
269                                return;
270                        }
271                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_STARTED, exec));
272                }
273*/
274
275                /*for(ExecTaskInterface etask : jobRegistry.getRunningTasks()){
276                        System.out.println(etask.getJobId());
277                        for(String taskId: etask.getVisitedResources())
278                                System.out.println("====="+taskId);
279                }*/
280        }
281       
282        public void finalizeExecutable(ExecTask execTask){
283               
284                Executable exec = (Executable)execTask;
285                exec.finalizeExecutable();
286               
287                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
288                scheduler.sim_cancel(filter, null);
289               
290                Task task;
291                Job job = jobRegistry.getJob(exec.getJobId());
292                try {
293                        task = job.getTask(exec.getTaskId());
294                } catch (NoSuchFieldException e) {
295                        return;
296                }
297                if(exec.getProcessesId() == null){
298                        try {
299                                task.setStatus(exec.getStatus());
300                        } catch (Exception e) {
301                                e.printStackTrace();
302                        }
303                } else {
304                        List<AbstractProcesses> processesList = task.getProcesses();
305                        for(int i = 0; i < processesList.size(); i++){
306                                AbstractProcesses processes = processesList.get(i);
307                                if(processes.getId().equals(exec.getProcessesId())){
308                                        processes.setStatus(exec.getStatus());
309                                        break;
310                                }
311                        }
312                }
313               
314                UsedResourcesList lastUsedList = exec.getUsedResources();
315                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
316                                .getResourceUnits();
317                getAllocationManager().freeResources(lastUsed);
318               
319                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
320                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
321                /*if(peUnit instanceof ProcessingElements){
322                        ProcessingElements pes = (ProcessingElements) peUnit;
323                        for (ComputingResource resource : pes) {
324                                resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
325                        }
326                } else {
327                        ComputingResource resource = null;
328                        try {
329                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
330                        } catch (ResourceException e) {
331                                return;
332                        }
333                        resource.handleEvent(new EnergyEvent(EnergyEventType.TASK_FINISHED, exec));
334                }*/
335
336                //sendFinishedWorkloadUnit(executable);
337        }
338       
339        protected void updateProcessingProgress() {
340                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
341                if (timeSpan <= 0.0) {
342                        // don't update when nothing changed
343                        return;
344                }
345                lastUpdateTime = Sim_system.clock();
346                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
347                while (iter.hasNext()) {
348                        ExecTask task = iter.next();
349                        Executable exec = (Executable)task;
350                        //exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * timeSpan/exec.getEstimatedDuration());
351                        exec.setCompletionPercentage(exec.getCompletionPercentage() + (100 - exec.getCompletionPercentage())  * timeSpan/(exec.getEstimatedDuration() - new DateTime().getMillis()/1000 + exec.getExecStartTime() + timeSpan));
352                       
353                        UsedResourcesList usedResourcesList = exec.getUsedResources();
354                        PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits()
355                                        .get(StandardResourceUnitName.PE);
356                        double load = getMIShare(timeSpan, peUnit);
357                        addTotalLoad(load);
358                }
359        }
360       
361        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
362
363                if(peUnit instanceof ProcessingElements){
364                        ProcessingElements pes = (ProcessingElements) peUnit;
365                        for (ComputingResource resource : pes) {
366                                resource.handleEvent(new EnergyEvent(eventType, obj));
367                        }
368                } else {
369                        ComputingResource resource = null;
370                        try {
371                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
372                        } catch (ResourceException e) {
373                                return;
374                        }
375                        resource.handleEvent(new EnergyEvent(eventType, obj));
376                }
377        }
378       
379        private double getMIShare(double timeSpan, PEUnit pes) {
380                double localLoad;
381                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
382                if (resCalendar == null)
383                        localLoad = 0;
384                else
385                        // 1 - localLoad_ = available MI share percentage
386                        localLoad = resCalendar.getCurrentLoad();
387
388                int speed = pes.getSpeed();
389                int cnt = pes.getAmount();
390
391                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
392                return totalMI;
393        }
394
395        protected void updateProcessingTimes(Sim_event ev) {
396                updateProcessingProgress();
397                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
398                        Executable exec = (Executable)execTask;
399                        List<String> visitedResource = exec.getVisitedResources();
400                        String originResource = ev.get_data().toString();
401                        if(!ArrayUtils.contains(visitedResource.toArray(new String[visitedResource.size()]), originResource)){
402                                continue;
403                        }
404                       
405                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
406                        int time = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
407                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
408
409                        //check if the new estimated end time is equal to the previous one; if yes the continue without update
410                        if(DoubleMath.subtract((exec.getExecStartTime() + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + time)) == 0.0){
411                                continue;
412                        }
413                        //exec.setEstimatedDuration(time);
414                        exec.setEstimatedDuration(Long.valueOf(new DateTime().getMillis()/1000).intValue() - Double.valueOf(exec.getExecStartTime()).intValue() + time);
415                        ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
416                        scheduler.sim_cancel(filter, null);
417                        scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
418                }
419        }       
420
421        public double calculateTotalLoad(int size) {
422                // background load, defined during initialization
423                double load;
424                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
425                if (resCalendar == null)
426                        load = 0;
427                else
428                        load = resCalendar.getCurrentLoad();
429
430                int numberOfPE = 0;
431                try {
432                        for(ResourceUnit resUnit : getResourceManager().getPE()){
433                                numberOfPE = numberOfPE + resUnit.getAmount();
434                        }
435                } catch (Exception e) {
436                        numberOfPE = 1;
437                }
438                double tasksPerPE = (double) size / numberOfPE;
439                load += Math.min(1.0 - load, tasksPerPE);
440
441                return load;
442        }
443
444        public Accumulator getTotalLoad() {
445                return accTotalLoad;
446        }
447
448        protected void addTotalLoad(double load) {
449                accTotalLoad.add(load);
450        }
451       
452        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
453                        ExecTask task) {
454
455                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
456                LocalResourceManager resourceManager = getResourceManager();
457                if(resourceName != null){
458                        ComputingResource resource = null;
459                        try {
460                                resource = resourceManager.getResourceByName(resourceName);
461                        } catch (ResourceException e) {
462                                return null;
463                        }
464                        resourceManager = new LocalResourceManager(resource);
465                }
466
467                int cpuRequest;
468                try {
469                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
470                } catch (NoSuchFieldException e) {
471                        cpuRequest = 1;
472                }
473
474                if (cpuRequest != 0) {
475                       
476                        List<ResourceUnit> availableUnits = null;
477                        try {
478                                availableUnits = resourceManager.getPE();
479                        } catch (ResourceException e) {
480                                return null;
481                        }
482                       
483                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
484                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
485                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
486                                if(peUnit.getFreeAmount() > 0){
487                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
488                                        cpuRequest = cpuRequest - allocPE;
489                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
490                                }       
491                        }
492                       
493                        if(cpuRequest > 0){
494                                return null;
495                        }
496                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
497                }
498
499                return  map;
500        }
501       
502        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
503                updateProcessingProgress();
504                registerWorkloadUnit(wu);
505        }
506
507        private void registerWorkloadUnit(WorkloadUnit wu){
508                if(!wu.isRegistered()){
509                        wu.register(jobRegistry);
510                }
511                wu.accept(getWorkloadUnitHandler());
512        }
513       
514        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
515               
516                public void handleJob(JobInterface<?> job){
517
518                        if (log.isInfoEnabled())
519                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
520
521                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
522                        jobsList.add(job);
523                        TaskListImpl availableTasks = new TaskListImpl();
524                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
525                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
526                                availableTasks.add(task);
527                        }
528
529                        for(TaskInterface<?> task: availableTasks){     
530                                registerWorkloadUnit(task);
531                        }
532                }
533               
534                public void handleTask(TaskInterface<?> t){
535                        Task task = (Task)t;
536                        List<AbstractProcesses> processes = task.getProcesses();
537
538                        if(processes == null || processes.size() == 0){
539                                Executable exec = new Executable(task);
540                                registerWorkloadUnit(exec);
541                        } else {
542                                for(int j = 0; j < processes.size(); j++){
543                                        AbstractProcesses procesesSet = processes.get(j);
544                                        Executable exec = new Executable(task, procesesSet);
545                                        registerWorkloadUnit(exec);
546                                }
547                        }
548                }
549               
550                public void handleExecutable(ExecTask task){
551                       
552                        Executable exec = (Executable) task;
553                        jobRegistry.addExecTask(exec);
554                       
555                        exec.trackResource(scheduler.get_name());
556                        Scheduler parentScheduler = scheduler.getParent();
557                        List<String> visitedResource = exec.getVisitedResources();
558                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
559                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
560                                exec.trackResource(parentScheduler.get_name());
561                                parentScheduler = parentScheduler.getParent();
562                        }
563                        exec.setSchedulerName(scheduler.get_id());
564                       
565                        TaskList newTasks = new TaskListImpl();
566                        newTasks.add(exec);
567               
568                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
569
570                        if (exec.getStatus() == DCWormsTags.QUEUED) {
571                                sendExecutableReadyEvent(exec);
572                        }
573                }
574        }
575
576        public WorkloadUnitHandler getWorkloadUnitHandler() {
577                return new LocalWorkloadUnitHandler();
578        }
579       
580       
581        public LocalResourceManager getResourceManager() {
582                if (resourceManager instanceof ResourceManager)
583                        return (LocalResourceManager) resourceManager;
584                else
585                        return null;
586        }
587       
588}
Note: See TracBrowser for help on using the repository browser.