source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 896

Revision 896, 20.7 KB checked in by wojtekp, 12 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.Accumulator;
8import gridsim.ResourceCalendar;
9import gridsim.dcworms.DCWormsTags;
10import gridsim.dcworms.filter.ExecTaskFilter;
11
12import java.util.ArrayList;
13import java.util.HashMap;
14import java.util.Iterator;
15import java.util.List;
16import java.util.Map;
17
18import org.apache.commons.lang.ArrayUtils;
19import org.apache.commons.logging.Log;
20import org.apache.commons.logging.LogFactory;
21import org.joda.time.DateTime;
22import org.joda.time.DateTimeUtilsExt;
23import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
24
25import qcg.shared.constants.BrokerConstants;
26import schedframe.ResourceController;
27import schedframe.events.scheduling.SchedulingEvent;
28import schedframe.events.scheduling.SchedulingEventType;
29import schedframe.events.scheduling.StartTaskExecutionEvent;
30import schedframe.events.scheduling.TaskFinishedEvent;
31import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
32import schedframe.exceptions.ResourceException;
33import schedframe.resources.computing.ComputingResource;
34import schedframe.resources.computing.profiles.energy.EnergyEvent;
35import schedframe.resources.computing.profiles.energy.EnergyEventType;
36import schedframe.resources.units.PEUnit;
37import schedframe.resources.units.ProcessingElements;
38import schedframe.resources.units.ResourceUnit;
39import schedframe.resources.units.ResourceUnitName;
40import schedframe.resources.units.StandardResourceUnitName;
41import schedframe.scheduling.ResourceHistoryItem;
42import schedframe.scheduling.Scheduler;
43import schedframe.scheduling.TaskList;
44import schedframe.scheduling.TaskListImpl;
45import schedframe.scheduling.UsedResourcesList;
46import schedframe.scheduling.WorkloadUnitHandler;
47import schedframe.scheduling.manager.resources.LocalResourceManager;
48import schedframe.scheduling.manager.resources.ManagedResources;
49import schedframe.scheduling.manager.resources.ResourceManager;
50import schedframe.scheduling.plan.AllocationInterface;
51import schedframe.scheduling.plan.ScheduledTaskInterface;
52import schedframe.scheduling.plan.SchedulingPlanInterface;
53import schedframe.scheduling.plugin.SchedulingPlugin;
54import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
55import schedframe.scheduling.plugin.grid.ModuleListImpl;
56import schedframe.scheduling.plugin.grid.ModuleType;
57import schedframe.scheduling.policy.AbstractManagementSystem;
58import schedframe.scheduling.queue.TaskQueueList;
59import schedframe.scheduling.tasks.AbstractProcesses;
60import schedframe.scheduling.tasks.Job;
61import schedframe.scheduling.tasks.JobInterface;
62import schedframe.scheduling.tasks.Task;
63import schedframe.scheduling.tasks.TaskInterface;
64import schedframe.scheduling.tasks.WorkloadUnit;
65import simulator.DCWormsConstants;
66import simulator.utils.DoubleMath;
67
68public class LocalManagementSystem extends AbstractManagementSystem {
69
70        private Log log = LogFactory.getLog(LocalManagementSystem.class);
71
72        protected double lastUpdateTime;
73
74        protected Accumulator accTotalLoad;
75
76        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
77                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
78                        throws Exception {
79
80                super(providerId, entityName, execTimeEstimationPlugin, queues);
81               
82                if (schedPlugin == null) {
83                        throw new Exception("Can not create local scheduling plugin instance.");
84                }
85                this.schedulingPlugin =  schedPlugin;
86                this.moduleList = new ModuleListImpl(1);
87               
88                this.accTotalLoad = new Accumulator();
89        }
90
91        public void init(Scheduler sched, ManagedResources managedResources) {
92                super.init(sched, managedResources);
93                double load = 0;
94                accTotalLoad.add(load);
95        }
96
97        public void processEvent(Sim_event ev) {
98
99                updateProcessingProgress();
100
101                int tag = ev.get_tag();
102
103                switch (tag) {
104
105                case DCWormsTags.TIMER:
106                        if (pluginSupportsEvent(tag)) {
107                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
108                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
109                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
110                                executeSchedulingPlan(decision);
111                        }
112                        sendTimerEvent();
113                        break;
114
115                case DCWormsTags.TASK_READY_FOR_EXECUTION:
116                        ExecTask execTask = (ExecTask) ev.get_data();
117                        try {
118                                execTask.setStatus(DCWormsTags.READY);
119                                if (pluginSupportsEvent(tag)) {
120                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
121                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
122                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
123                                        executeSchedulingPlan(decision);
124                                }
125                        } catch (Exception e) {
126                                e.printStackTrace();
127                        }
128                        break;
129
130                case DCWormsTags.TASK_EXECUTION_FINISHED:
131                        execTask = (ExecTask) ev.get_data();
132                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
133                               
134                                finalizeExecutable(execTask);
135                                sendFinishedWorkloadUnit(execTask);
136                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
137                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
138                                if (pluginSupportsEvent(tag)) {
139                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
140                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
141                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
142                                        executeSchedulingPlan(decision);
143                                }
144                        }
145
146                        Job job = jobRegistry.getJob(execTask.getJobId());
147                        if(!job.isFinished()){
148                                getWorkloadUnitHandler().handleJob(job);
149                        }
150                        break;
151                       
152                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
153                        execTask = (Executable) ev.get_data();
154                        if (pluginSupportsEvent(tag)) {
155                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
156                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
157                                                queues, getJobRegistry(), getResourceManager(), moduleList);
158                                executeSchedulingPlan(decision);
159                        }
160                        break;
161                       
162                case DCWormsTags.UPDATE_PROCESSING:
163                        updateProcessingTimes(ev);
164                        break;
165                       
166                case DCWormsTags.TASK_EXECUTION_CHANGED:
167                        execTask = (ExecTask) ev.get_data();
168                        updateTaskExecution(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
169                        break;
170                }
171        }
172
173        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
174                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
175                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
176                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
177                                        queues, getJobRegistry(), getResourceManager(), moduleList);
178                        executeSchedulingPlan(decision);
179                }
180                //if(scheduler.getParent() != null){
181                        sendFinishedWorkloadUnit(wu);
182                //}
183        }
184       
185        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
186
187                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
188                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
189                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
190
191                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
192                                continue;
193                        }
194
195                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
196
197                        TaskInterface<?> task = taskDecision.getTask();
198                        for (int j = 0; j < allocations.size(); j++) {
199
200                                AllocationInterface<?> allocation = allocations.get(j);
201                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
202                                        ExecTask exec = (ExecTask) task;                                       
203                                        executeTask(exec, allocation.getRequestedResources());
204                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
205                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
206                                        submitTask(task, allocation);
207                                } else {
208                                        ExecTask exec = (ExecTask) task;
209                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
210                                }
211                        }
212                }
213        }
214
215        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
216
217                Executable exec = (Executable)task;
218                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
219                if(allocationStatus == false){
220                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
221                        return;
222                }
223
224                removeFromQueue(task);
225
226                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
227
228                //if (phaseDuration < 0.0)
229                //      return;
230
231                //exec.setEstimatedDuration(exec.getEstimatedDuration() + phaseDuration);
232                DateTime currentTime = new DateTime();
233                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
234                exec.addUsedResources(resHistItem);
235               
236                try {
237                        exec.setStatus(DCWormsTags.INEXEC);
238                } catch (Exception e) {
239                        // TODO Auto-generated catch block
240                        e.printStackTrace();
241                }
242               
243                updateTaskExecution(exec, SchedulingEventType.START_TASK_EXECUTION);
244                //scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
245
246                try {
247                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
248                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
249                } catch (NoSuchFieldException e) {
250                        //double t = exec.getEstimatedDuration();
251                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
252                }
253
254                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad(jobRegistry.getRunningTasks().size()));
255               
256                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
257               
258                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
259               
260        }
261       
262        public void finalizeExecutable(ExecTask execTask){
263               
264                Executable exec = (Executable)execTask;
265                exec.finalizeExecutable();
266               
267                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
268                scheduler.sim_cancel(filter, null);
269               
270                Task task;
271                Job job = jobRegistry.getJob(exec.getJobId());
272                try {
273                        task = job.getTask(exec.getTaskId());
274                } catch (NoSuchFieldException e) {
275                        return;
276                }
277                if(exec.getProcessesId() == null){
278                        try {
279                                task.setStatus(exec.getStatus());
280                        } catch (Exception e) {
281                                e.printStackTrace();
282                        }
283                } else {
284                        List<AbstractProcesses> processesList = task.getProcesses();
285                        for(int i = 0; i < processesList.size(); i++){
286                                AbstractProcesses processes = processesList.get(i);
287                                if(processes.getId().equals(exec.getProcessesId())){
288                                        processes.setStatus(exec.getStatus());
289                                        break;
290                                }
291                        }
292                }
293               
294                UsedResourcesList lastUsedList = exec.getUsedResources();
295                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
296                                .getResourceUnits();
297                getAllocationManager().freeResources(lastUsed);
298               
299                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
300                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
301        }
302       
303        protected void updateProcessingProgress() {
304                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
305                if (timeSpan <= 0.0) {
306                        // don't update when nothing changed
307                        return;
308                }
309                lastUpdateTime = Sim_system.clock();
310                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
311                while (iter.hasNext()) {
312                        ExecTask task = iter.next();
313                        Executable exec = (Executable)task;
314                        //exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * timeSpan/exec.getEstimatedDuration());
315                        exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / exec.getEstimatedDuration()));
316                        UsedResourcesList usedResourcesList = exec.getUsedResources();
317                        PEUnit peUnit = (PEUnit)usedResourcesList.getLast().getResourceUnits()
318                                        .get(StandardResourceUnitName.PE);
319                        double load = getMIShare(timeSpan, peUnit);
320                        addTotalLoad(load);
321                }
322        }
323       
324        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
325                if(peUnit instanceof ProcessingElements){
326                        ProcessingElements pes = (ProcessingElements) peUnit;
327                        for (ComputingResource resource : pes) {
328                                resource.handleEvent(new EnergyEvent(eventType, obj));
329                        }
330                        /*try {
331                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
332                                        resource.handleEvent(new EnergyEvent(eventType, obj));
333                                }
334                        } catch (ResourceException e) {
335                                // TODO Auto-generated catch block
336                                e.printStackTrace();
337                        }*/
338                } else {
339                        ComputingResource resource = null;
340                        try {
341                                resource = ResourceController.getComputingResourceByName(peUnit.getResourceId());
342                        } catch (ResourceException e) {
343                                return;
344                        }
345                        resource.handleEvent(new EnergyEvent(eventType, obj));
346                }
347        }
348       
349        private double getMIShare(double timeSpan, PEUnit pes) {
350                double localLoad;
351                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
352                if (resCalendar == null)
353                        localLoad = 0;
354                else
355                        // 1 - localLoad_ = available MI share percentage
356                        localLoad = resCalendar.getCurrentLoad();
357
358                int speed = pes.getSpeed();
359                int cnt = pes.getAmount();
360
361                double totalMI = speed * cnt * timeSpan * (1 - localLoad);
362                return totalMI;
363        }
364
365        protected void updateProcessingTimes(Sim_event ev) {
366                updateProcessingProgress();
367                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
368                        Executable exec = (Executable)execTask;
369
370                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
371                        double lastTimeStamp = exec.getUsedResources().getLast().getTimeStamp().getMillis()/1000;
372                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
373                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
374
375                        if(DoubleMath.subtract((lastTimeStamp + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
376                                return;
377                        }
378
379                        exec.setEstimatedDuration(phaseDuration);
380                        DateTime currentTime = new DateTime();
381                        ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
382                        exec.addUsedResources(resHistItem);
383                       
384                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
385                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
386                                scheduler.sim_cancel(filter, null);
387                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
388                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
389                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
390                        } else{
391                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
392                                scheduler.sim_cancel(filter, null);
393                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
394                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
395                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
396                        }
397                }
398        }       
399
400        protected void updateTaskExecution(ExecTask execTask, SchedulingEventType schedEvType) {
401
402                if (execTask.getStatus() == DCWormsTags.INEXEC) {
403                        Executable exec = (Executable)execTask;
404                       
405                        try {
406                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
407                        } catch (Exception e) {
408                        }
409                       
410                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
411
412                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
413                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
414
415                        exec.setEstimatedDuration(phaseDuration);
416                       
417                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
418                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
419                        } else {
420                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
421                        }
422                }
423        }       
424
425        public double calculateTotalLoad(int size) {
426                // background load, defined during initialization
427                double load;
428                ResourceCalendar resCalendar = (ResourceCalendar) moduleList.getModule(ModuleType.RESOURCE_CALENDAR);
429                if (resCalendar == null)
430                        load = 0;
431                else
432                        load = resCalendar.getCurrentLoad();
433
434                int numberOfPE = 0;
435                try {
436                        for(ResourceUnit resUnit : getResourceManager().getPE()){
437                                numberOfPE = numberOfPE + resUnit.getAmount();
438                        }
439                } catch (Exception e) {
440                        numberOfPE = 1;
441                }
442                double tasksPerPE = (double) size / numberOfPE;
443                load += Math.min(1.0 - load, tasksPerPE);
444
445                return load;
446        }
447
448        public Accumulator getTotalLoad() {
449                return accTotalLoad;
450        }
451
452        protected void addTotalLoad(double load) {
453                accTotalLoad.add(load);
454        }
455       
456        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
457                        ExecTask task) {
458
459                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
460                LocalResourceManager resourceManager = getResourceManager();
461                if(resourceName != null){
462                        ComputingResource resource = null;
463                        try {
464                                resource = resourceManager.getResourceByName(resourceName);
465                        } catch (ResourceException e) {
466                                return null;
467                        }
468                        resourceManager = new LocalResourceManager(resource);
469                }
470
471                int cpuRequest;
472                try {
473                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
474                } catch (NoSuchFieldException e) {
475                        cpuRequest = 1;
476                }
477
478                if (cpuRequest != 0) {
479                       
480                        List<ResourceUnit> availableUnits = null;
481                        try {
482                                availableUnits = resourceManager.getPE();
483                        } catch (ResourceException e) {
484                                return null;
485                        }
486                       
487                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
488                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
489                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
490                                if(peUnit.getFreeAmount() > 0){
491                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
492                                        cpuRequest = cpuRequest - allocPE;
493                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
494                                }       
495                        }
496                       
497                        if(cpuRequest > 0){
498                                return null;
499                        }
500                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
501                }
502
503                return  map;
504        }
505       
506        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
507                updateProcessingProgress();
508                registerWorkloadUnit(wu);
509        }
510
511        private void registerWorkloadUnit(WorkloadUnit wu){
512                if(!wu.isRegistered()){
513                        wu.register(jobRegistry);
514                }
515                wu.accept(getWorkloadUnitHandler());
516        }
517       
518        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
519               
520                public void handleJob(JobInterface<?> job){
521
522                        if (log.isInfoEnabled())
523                                log.info("Received job " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
524
525                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
526                        jobsList.add(job);
527                        TaskListImpl availableTasks = new TaskListImpl();
528                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
529                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
530                                availableTasks.add(task);
531                        }
532
533                        for(TaskInterface<?> task: availableTasks){     
534                                registerWorkloadUnit(task);
535                        }
536                }
537               
538                public void handleTask(TaskInterface<?> t){
539                        Task task = (Task)t;
540                        List<AbstractProcesses> processes = task.getProcesses();
541
542                        if(processes == null || processes.size() == 0){
543                                Executable exec = new Executable(task);
544                                registerWorkloadUnit(exec);
545                        } else {
546                                for(int j = 0; j < processes.size(); j++){
547                                        AbstractProcesses procesesSet = processes.get(j);
548                                        Executable exec = new Executable(task, procesesSet);
549                                        registerWorkloadUnit(exec);
550                                }
551                        }
552                }
553               
554                public void handleExecutable(ExecTask task){
555                       
556                        Executable exec = (Executable) task;
557                        jobRegistry.addExecTask(exec);
558                       
559                        exec.trackResource(scheduler.get_name());
560                        Scheduler parentScheduler = scheduler.getParent();
561                        List<String> visitedResource = exec.getVisitedResources();
562                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
563                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
564                                exec.trackResource(parentScheduler.get_name());
565                                parentScheduler = parentScheduler.getParent();
566                        }
567                        exec.setSchedulerName(scheduler.get_id());
568                       
569                        TaskList newTasks = new TaskListImpl();
570                        newTasks.add(exec);
571               
572                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
573
574                        if (exec.getStatus() == DCWormsTags.QUEUED) {
575                                sendExecutableReadyEvent(exec);
576                        }
577                }
578        }
579
580        public WorkloadUnitHandler getWorkloadUnitHandler() {
581                return new LocalWorkloadUnitHandler();
582        }
583       
584       
585        public LocalResourceManager getResourceManager() {
586                if (resourceManager instanceof ResourceManager)
587                        return (LocalResourceManager) resourceManager;
588                else
589                        return null;
590        }
591       
592}
Note: See TracBrowser for help on using the repository browser.