source: DCWoRMS/branches/coolemall/src/schedframe/scheduling/policy/local/LocalManagementSystem.java @ 1298

Revision 1298, 21.0 KB checked in by wojtekp, 11 years ago (diff)
  • Property svn:mime-type set to text/plain
Line 
1package schedframe.scheduling.policy.local;
2
3import dcworms.schedframe.scheduling.ExecTask;
4import dcworms.schedframe.scheduling.Executable;
5import eduni.simjava.Sim_event;
6import eduni.simjava.Sim_system;
7import gridsim.dcworms.DCWormsTags;
8import gridsim.dcworms.filter.ExecTaskFilter;
9
10import java.util.ArrayList;
11import java.util.HashMap;
12import java.util.Iterator;
13import java.util.List;
14import java.util.Map;
15
16import org.apache.commons.lang.ArrayUtils;
17import org.apache.commons.logging.Log;
18import org.apache.commons.logging.LogFactory;
19import org.joda.time.DateTime;
20import org.joda.time.DateTimeUtilsExt;
21import org.qcg.broker.schemas.schedulingplan.types.AllocationStatus;
22
23import qcg.shared.constants.BrokerConstants;
24import schedframe.SimulatedEnvironment;
25import schedframe.events.scheduling.SchedulingEvent;
26import schedframe.events.scheduling.SchedulingEventType;
27import schedframe.events.scheduling.StartTaskExecutionEvent;
28import schedframe.events.scheduling.TaskFinishedEvent;
29import schedframe.events.scheduling.TaskRequestedTimeExpiredEvent;
30import schedframe.exceptions.ResourceException;
31import schedframe.resources.StandardResourceType;
32import schedframe.resources.computing.ComputingResource;
33import schedframe.resources.computing.profiles.energy.EnergyEvent;
34import schedframe.resources.computing.profiles.energy.EnergyEventType;
35import schedframe.resources.units.PEUnit;
36import schedframe.resources.units.ProcessingElements;
37import schedframe.resources.units.ResourceUnit;
38import schedframe.resources.units.ResourceUnitName;
39import schedframe.resources.units.StandardResourceUnitName;
40import schedframe.scheduling.ResourceHistoryItem;
41import schedframe.scheduling.Scheduler;
42import schedframe.scheduling.TaskList;
43import schedframe.scheduling.TaskListImpl;
44import schedframe.scheduling.UsedResourcesList;
45import schedframe.scheduling.WorkloadUnitHandler;
46import schedframe.scheduling.manager.resources.LocalResourceManager;
47import schedframe.scheduling.manager.resources.ManagedResources;
48import schedframe.scheduling.manager.resources.ResourceManager;
49import schedframe.scheduling.plan.AllocationInterface;
50import schedframe.scheduling.plan.ScheduledTaskInterface;
51import schedframe.scheduling.plan.SchedulingPlanInterface;
52import schedframe.scheduling.plugin.SchedulingPlugin;
53import schedframe.scheduling.plugin.estimation.ExecutionTimeEstimationPlugin;
54import schedframe.scheduling.plugin.grid.ModuleListImpl;
55import schedframe.scheduling.policy.AbstractManagementSystem;
56import schedframe.scheduling.queue.TaskQueueList;
57import schedframe.scheduling.tasks.AbstractProcesses;
58import schedframe.scheduling.tasks.Job;
59import schedframe.scheduling.tasks.JobInterface;
60import schedframe.scheduling.tasks.Task;
61import schedframe.scheduling.tasks.TaskInterface;
62import schedframe.scheduling.tasks.WorkloadUnit;
63import simulator.DCWormsConstants;
64import simulator.stats.GSSAccumulator;
65import simulator.utils.DoubleMath;
66
67public class LocalManagementSystem extends AbstractManagementSystem {
68
69        private Log log = LogFactory.getLog(LocalManagementSystem.class);
70
71        protected double lastUpdateTime;
72
73        public LocalManagementSystem(String providerId, String entityName, SchedulingPlugin schedPlugin,
74                        ExecutionTimeEstimationPlugin execTimeEstimationPlugin, TaskQueueList queues)
75                        throws Exception {
76
77                super(providerId, entityName, execTimeEstimationPlugin, queues);
78               
79                if (schedPlugin == null) {
80                        throw new Exception("Can not create local scheduling plugin instance.");
81                }
82                this.schedulingPlugin =  schedPlugin;
83                this.moduleList = new ModuleListImpl(1);
84        }
85
86        public void init(Scheduler sched, ManagedResources managedResources) {
87                super.init(sched, managedResources);
88        }
89
90        public void processEvent(Sim_event ev) {
91
92                updateProcessingProgress();
93
94                int tag = ev.get_tag();
95
96                switch (tag) {
97
98                case DCWormsTags.TIMER:
99                        if (pluginSupportsEvent(tag)) {
100                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TIMER);
101                                SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
102                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
103                                executeSchedulingPlan(decision);
104                        }
105                        sendTimerEvent();
106                        break;
107
108                case DCWormsTags.TASK_READY_FOR_EXECUTION:
109                        ExecTask execTask = (ExecTask) ev.get_data();
110                        try {
111                                execTask.setStatus(DCWormsTags.READY);
112                                if (pluginSupportsEvent(tag)) {
113                                        SchedulingEvent event = new StartTaskExecutionEvent(execTask.getJobId(), execTask.getId());
114                                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
115                                                        queues,  getJobRegistry(), getResourceManager(), moduleList);
116                                        executeSchedulingPlan(decision);
117                                }
118                        } catch (Exception e) {
119                                e.printStackTrace();
120                        }
121                        break;
122
123                case DCWormsTags.TASK_EXECUTION_FINISHED:
124                        execTask = (ExecTask) ev.get_data();
125                        if (execTask.getStatus() == DCWormsTags.INEXEC) {
126                               
127                                finalizeExecutable(execTask);
128                                sendFinishedWorkloadUnit(execTask);
129                                log.debug(execTask.getJobId() + "_" + execTask.getId() + " finished execution on " + new DateTime());
130                                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
131                                if (pluginSupportsEvent(tag)) {
132                                        SchedulingEvent event = new TaskFinishedEvent(execTask.getJobId(), execTask.getId());
133                                        SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
134                                                        queues, getJobRegistry(), getResourceManager(), moduleList);
135                                        executeSchedulingPlan(decision);
136                                }
137                        }
138
139                        Job job = jobRegistry.getJob(execTask.getJobId());
140                        if(!job.isFinished()){
141                                getWorkloadUnitHandler().handleJob(job);
142                        }
143                        //SUPPORTS PRECEDING CONSTRAINST COMING BOTH FROM SWF FILES
144                        /*else {
145                                try {
146                                        job.setStatus((int)BrokerConstants.JOB_STATUS_FINISHED);
147                                } catch (Exception e) {
148
149                                }
150                                for(JobInterface<?> j: jobRegistry.getJobs((int)BrokerConstants.JOB_STATUS_SUBMITTED)){
151                                        getWorkloadUnitHandler().handleJob(j); 
152                                }
153                        }*/
154                        break;
155                       
156                case DCWormsTags.TASK_REQUESTED_TIME_EXPIRED:
157                        execTask = (Executable) ev.get_data();
158                        if (pluginSupportsEvent(tag)) {
159                                SchedulingEvent event = new TaskRequestedTimeExpiredEvent(execTask.getJobId(), execTask.getId());
160                                SchedulingPlanInterface<?> decision = schedulingPlugin.schedule(event,
161                                                queues, getJobRegistry(), getResourceManager(), moduleList);
162                                executeSchedulingPlan(decision);
163                        }
164                        break;
165                       
166                case DCWormsTags.UPDATE_PROCESSING:
167                        updateProcessingTimes(ev);
168                        break;
169                       
170                case DCWormsTags.TASK_EXECUTION_CHANGED:
171                        execTask = (ExecTask) ev.get_data();
172                        updateTaskExecution(execTask, SchedulingEventType.RESOURCE_STATE_CHANGED);
173                        break;
174                       
175                case DCWormsTags.POWER_LIMIT_EXCEEDED:
176                        if (pluginSupportsEvent(tag)) {
177                                SchedulingEvent event = new SchedulingEvent(SchedulingEventType.POWER_LIMIT_EXCEEDED);
178                                schedulingPlugin.schedule(event,
179                                                queues,  getJobRegistry(), getResourceManager(), moduleList);
180                        }
181                        break;
182                }
183        }
184
185        public void notifyReturnedWorkloadUnit(WorkloadUnit wu) {
186                if (pluginSupportsEvent(DCWormsTags.TASK_EXECUTION_FINISHED)) {
187                        SchedulingEvent event = new SchedulingEvent(SchedulingEventType.TASK_FINISHED);
188                        SchedulingPlanInterface<?> decision =  schedulingPlugin.schedule(event,
189                                        queues, getJobRegistry(), getResourceManager(), moduleList);
190                        executeSchedulingPlan(decision);
191                }
192                //if(scheduler.getParent() != null){
193                        sendFinishedWorkloadUnit(wu);
194                //}
195        }
196       
197        protected void executeSchedulingPlan(SchedulingPlanInterface<?> decision) {
198
199                ArrayList<ScheduledTaskInterface<?>> taskSchedulingDecisions = decision.getTasks();
200                for (int i = 0; i < taskSchedulingDecisions.size(); i++) {
201                        ScheduledTaskInterface<?> taskDecision = taskSchedulingDecisions.get(i);
202
203                        if (taskDecision.getStatus() == AllocationStatus.REJECTED) {
204                                continue;
205                        }
206
207                        ArrayList<AllocationInterface<?>> allocations = taskDecision.getAllocations();
208
209                        TaskInterface<?> task = taskDecision.getTask();
210                        for (int j = 0; j < allocations.size(); j++) {
211
212                                AllocationInterface<?> allocation = allocations.get(j);
213                                if (allocation.getRequestedResources() == null || allocation.getRequestedResources().size() > 0) {
214                                        ExecTask exec = (ExecTask) task;                                       
215                                        executeTask(exec, allocation.getRequestedResources());
216                                } else if(resourceManager.getSchedulerName(allocation.getProviderName()) != null){
217                                        allocation.setProviderName(resourceManager.getSchedulerName(allocation.getProviderName()));
218                                        submitTask(task, allocation);
219                                } else {
220                                        ExecTask exec = (ExecTask) task;
221                                        executeTask(exec, chooseResourcesForExecution(allocation.getProviderName(), exec));
222                                }
223                        }
224                }
225        }
226
227        protected void executeTask(ExecTask task, Map<ResourceUnitName, ResourceUnit> choosenResources) {
228
229                Executable exec = (Executable)task;
230                boolean allocationStatus = getAllocationManager().allocateResources(choosenResources);
231                if(allocationStatus == false){
232                        log.info("Task " + task.getJobId() + "_" + task.getId() + " requires more resources than is available at this moment.");
233                        return;
234                }
235
236                removeFromQueue(task);
237
238                log.debug(task.getJobId() + "_" + task.getId() + " starts executing on " + new DateTime());
239
240                //if (phaseDuration < 0.0)
241                //      return;
242
243                //exec.setEstimatedDuration(exec.getEstimatedDuration() + phaseDuration);
244                DateTime currentTime = new DateTime();
245                ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
246                exec.addUsedResources(resHistItem);
247               
248                try {
249                        exec.setStatus(DCWormsTags.INEXEC);
250                } catch (Exception e) {
251                        // TODO Auto-generated catch block
252                        e.printStackTrace();
253                }
254               
255                updateTaskExecution(exec, SchedulingEventType.START_TASK_EXECUTION);
256                //scheduler.sendInternal(time, DCWormsTags.TASK_EXECUTION_FINISHED, exec);
257
258                try {
259                        long expectedDuration = exec.getExpectedDuration().getMillis() / 1000;
260                        scheduler.sendInternal(expectedDuration, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
261                } catch (NoSuchFieldException e) {
262                        //double t = exec.getEstimatedDuration();
263                        //scheduler.sendInternal(t, DCWormsTags.TASK_REQUESTED_TIME_EXPIRED, exec);
264                }
265
266                log.info(DCWormsConstants.USAGE_MEASURE_NAME + ": " + calculateTotalLoad());
267               
268                PEUnit peUnit = (PEUnit)choosenResources.get(StandardResourceUnitName.PE);
269               
270                notifyComputingResources(peUnit, EnergyEventType.TASK_STARTED, exec);
271               
272        }
273       
274        protected void finalizeExecutable(ExecTask execTask){
275               
276                Executable exec = (Executable)execTask;
277                exec.finalizeExecutable();
278               
279                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_REQUESTED_TIME_EXPIRED);
280                scheduler.sim_cancel(filter, null);
281               
282                Task task;
283                Job job = jobRegistry.getJob(exec.getJobId());
284                try {
285                        task = job.getTask(exec.getTaskId());
286                } catch (NoSuchFieldException e) {
287                        return;
288                }
289                if(exec.getProcessesId() == null){
290                        try {
291                                task.setStatus(exec.getStatus());
292                        } catch (Exception e) {
293                                e.printStackTrace();
294                        }
295                } else {
296                        List<AbstractProcesses> processesList = task.getProcesses();
297                        for(int i = 0; i < processesList.size(); i++){
298                                AbstractProcesses processes = processesList.get(i);
299                                if(processes.getId().equals(exec.getProcessesId())){
300                                        processes.setStatus(exec.getStatus());
301                                        break;
302                                }
303                        }
304                }
305               
306                UsedResourcesList lastUsedList = exec.getUsedResources();
307                Map<ResourceUnitName, ResourceUnit> lastUsed = lastUsedList.getLast()
308                                .getResourceUnits();
309                getAllocationManager().freeResources(lastUsed);
310               
311                PEUnit peUnit = (PEUnit)lastUsed.get(StandardResourceUnitName.PE);
312                notifyComputingResources(peUnit, EnergyEventType.TASK_FINISHED, exec);
313        }
314       
315        protected void updateProcessingProgress() {
316                double timeSpan = DoubleMath.subtract(Sim_system.clock(), lastUpdateTime);
317                if (timeSpan <= 0.0) {
318                        // don't update when nothing changed
319                        return;
320                }
321                lastUpdateTime = Sim_system.clock();
322                Iterator<ExecTask> iter = jobRegistry.getRunningTasks().iterator();
323                while (iter.hasNext()) {
324                        ExecTask task = iter.next();
325                        Executable exec = (Executable)task;
326                        if( exec.getUsedResources().size() > 1){         
327                                exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / exec.getEstimatedDuration() * (1.0 - exec.getCompletionPercentage()/100.0)));
328                        }
329                        else {
330                                exec.setCompletionPercentage(exec.getCompletionPercentage() + 100 * (timeSpan / exec.getEstimatedDuration()));
331                        }
332                }
333        }
334       
335        private void notifyComputingResources(PEUnit peUnit, EnergyEventType eventType, Object obj){
336                if(peUnit instanceof ProcessingElements){
337                        ProcessingElements pes = (ProcessingElements) peUnit;
338                        for (ComputingResource resource : pes) {
339                                resource.handleEvent(new EnergyEvent(eventType, obj));
340                                //DataCenterWorkloadSimulator.getEventManager().sendToResources(resource.getType(), 0, new EnergyEvent(eventType, obj));
341                        }
342                        /*try {
343                                for (ComputingResource resource : resourceManager.getResourcesOfType(pes.get(0).getType())) {
344                                        resource.handleEvent(new EnergyEvent(eventType, obj));
345                                }
346                        } catch (ResourceException e) {
347                                // TODO Auto-generated catch block
348                                e.printStackTrace();
349                        }*/
350                } else {
351                        ComputingResource resource = null;
352                        try {
353                                resource = SimulatedEnvironment.getComputingResourceByName(peUnit.getResourceId());
354                        } catch (ResourceException e) {
355                                return;
356                        }
357                        resource.handleEvent(new EnergyEvent(eventType, obj));
358                }
359        }
360
361        protected void updateProcessingTimes(Sim_event ev) {
362                for (ExecTask execTask : jobRegistry.getRunningTasks()) {
363                        Executable exec = (Executable)execTask;
364
365                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
366                        double lastTimeStamp = exec.getUsedResources().getLast().getTimeStamp().getMillis()/1000;
367                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(SchedulingEventType.RESOURCE_STATE_CHANGED),
368                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
369
370                        if(DoubleMath.subtract((lastTimeStamp + exec.getEstimatedDuration()), (new DateTime().getMillis()/1000 + phaseDuration)) == 0.0){
371                                continue;
372                        }
373
374                        exec.setEstimatedDuration(phaseDuration);
375                        DateTime currentTime = new DateTime();
376                        ResourceHistoryItem resHistItem = new ResourceHistoryItem(choosenResources, currentTime);
377                        exec.addUsedResources(resHistItem);
378                       
379                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
380                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_FINISHED);
381                                scheduler.sim_cancel(filter, null);
382                                scheduler.sendInternal(phaseDuration , DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
383                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
384                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
385                        } else{
386                                ExecTaskFilter filter = new ExecTaskFilter(exec.getUniqueId(), DCWormsTags.TASK_EXECUTION_CHANGED);
387                                scheduler.sim_cancel(filter, null);
388                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
389                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
390                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
391                        }
392                }
393        }       
394
395        protected void updateTaskExecution(ExecTask execTask, SchedulingEventType schedEvType) {
396
397                if (execTask.getStatus() == DCWormsTags.INEXEC) {
398                        Executable exec = (Executable)execTask;
399                       
400                        try {
401                                exec.setStatus(DCWormsTags.NEW_EXEC_PHASE);
402                        } catch (Exception e) {
403                        }
404                       
405                        Map<ResourceUnitName, ResourceUnit> choosenResources = exec.getUsedResources().getLast().getResourceUnits();
406
407                        int phaseDuration = Double.valueOf(execTimeEstimationPlugin.execTimeEstimation(new SchedulingEvent(schedEvType),
408                                        execTask, choosenResources, exec.getCompletionPercentage())).intValue();
409
410                        exec.setEstimatedDuration(phaseDuration);
411                       
412                        if(exec.getResourceConsumptionProfile().getCurrentResourceConsumption() == exec.getResourceConsumptionProfile().getResourceConsumptionList().getLast()){
413                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_FINISHED, execTask);
414                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
415                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
416                        } else {
417                                scheduler.sendInternal(phaseDuration, DCWormsTags.TASK_EXECUTION_CHANGED, execTask);
418                                PEUnit peUnit = (PEUnit)exec.getUsedResources().getLast().getResourceUnits().get(StandardResourceUnitName.PE);
419                                notifyComputingResources(peUnit, EnergyEventType.RESOURCE_UTILIZATION_CHANGED, exec);
420                        }
421                }
422        }       
423
424        public double calculateTotalLoad() {
425
426                GSSAccumulator loadAcc = new GSSAccumulator();
427                try {
428                        for(ComputingResource compRes: getResourceManager().getResourcesOfType(StandardResourceType.Node)){
429                                loadAcc.add(compRes.getLoadInterface().getRecentUtilization().getValue());                             
430                        }
431                } catch (ResourceException e) {
432                        // TODO Auto-generated catch block
433                        e.printStackTrace();
434                }
435               
436                return loadAcc.getMean();
437        }
438
439        private Map<ResourceUnitName, ResourceUnit> chooseResourcesForExecution(String resourceName,
440                        ExecTask task) {
441
442                Map<ResourceUnitName, ResourceUnit> map = new HashMap<ResourceUnitName, ResourceUnit>();
443                LocalResourceManager resourceManager = getResourceManager();
444                if(resourceName != null){
445                        ComputingResource resource = null;
446                        try {
447                                resource = resourceManager.getResourceByName(resourceName);
448                        } catch (ResourceException e) {
449                                return null;
450                        }
451                        resourceManager = new LocalResourceManager(resource);
452                }
453
454                int cpuRequest;
455                try {
456                        cpuRequest = Double.valueOf(task.getCpuCntRequest()).intValue();
457                } catch (NoSuchFieldException e) {
458                        cpuRequest = 1;
459                }
460
461                if (cpuRequest != 0) {
462                       
463                        List<ResourceUnit> availableUnits = null;
464                        try {
465                                availableUnits = resourceManager.getPE();
466                        } catch (ResourceException e) {
467                                return null;
468                        }
469                       
470                        List<ResourceUnit> choosenPEUnits = new ArrayList<ResourceUnit>();
471                        for (int i = 0; i < availableUnits.size() && cpuRequest > 0; i++) {
472                                PEUnit peUnit = (PEUnit) availableUnits.get(i);
473                                if(peUnit.getFreeAmount() > 0){
474                                        int allocPE = Math.min(peUnit.getFreeAmount(), cpuRequest);
475                                        cpuRequest = cpuRequest - allocPE;
476                                        choosenPEUnits.add(peUnit.replicate(allocPE)); 
477                                }       
478                        }
479                       
480                        if(cpuRequest > 0){
481                                return null;
482                        }
483                        map.put(StandardResourceUnitName.PE, choosenPEUnits.get(0));
484                }
485
486                return  map;
487        }
488       
489        public void notifySubmittedWorkloadUnit(WorkloadUnit wu, boolean ack) {
490                updateProcessingProgress();
491                if (log.isInfoEnabled())
492                        log.info("Received job " + wu.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
493                registerWorkloadUnit(wu);
494        }
495
496        private void registerWorkloadUnit(WorkloadUnit wu){
497                if(!wu.isRegistered()){
498                        wu.register(jobRegistry);
499                }
500                wu.accept(getWorkloadUnitHandler());
501        }
502       
503        class LocalWorkloadUnitHandler implements WorkloadUnitHandler{
504               
505                public void handleJob(JobInterface<?> job){
506
507                        if (log.isInfoEnabled())
508                                log.info("Handle " + job.getId() + " at " + new DateTime(DateTimeUtilsExt.currentTimeMillis()));
509                       
510                        try {
511                                if(job.getStatus() == BrokerConstants.JOB_STATUS_UNSUBMITTED)
512                                        job.setStatus((int)BrokerConstants.JOB_STATUS_SUBMITTED);
513                        } catch (Exception e) {
514                                e.printStackTrace();
515                        }
516                        List<JobInterface<?>> jobsList = new ArrayList<JobInterface<?>>();
517                        jobsList.add(job);
518                        TaskListImpl availableTasks = new TaskListImpl();
519                        for(Task task: jobRegistry.getAvailableTasks(jobsList)){
520                                task.setStatus((int)BrokerConstants.TASK_STATUS_QUEUED);
521                                availableTasks.add(task);
522                        }
523
524                        for(TaskInterface<?> task: availableTasks){     
525                                registerWorkloadUnit(task);
526                        }
527                }
528               
529                public void handleTask(TaskInterface<?> t){
530                        Task task = (Task)t;
531                        List<AbstractProcesses> processes = task.getProcesses();
532
533                        if(processes == null || processes.size() == 0){
534                                Executable exec = new Executable(task);
535                                registerWorkloadUnit(exec);
536                        } else {
537                                for(int j = 0; j < processes.size(); j++){
538                                        AbstractProcesses procesesSet = processes.get(j);
539                                        Executable exec = new Executable(task, procesesSet);
540                                        registerWorkloadUnit(exec);
541                                }
542                        }
543                }
544               
545                public void handleExecutable(ExecTask task){
546                       
547                        Executable exec = (Executable) task;
548                        jobRegistry.addExecTask(exec);
549                       
550                        exec.trackResource(scheduler.get_name());
551                        Scheduler parentScheduler = scheduler.getParent();
552                        List<String> visitedResource = exec.getVisitedResources();
553                        String [] visitedResourcesArray = visitedResource.toArray(new String[visitedResource.size()]);
554                        while (parentScheduler != null && !ArrayUtils.contains(visitedResourcesArray, parentScheduler.get_name())) {
555                                exec.trackResource(parentScheduler.get_name());
556                                parentScheduler = parentScheduler.getParent();
557                        }
558                        exec.setSchedulerName(scheduler.get_id());
559                       
560                        TaskList newTasks = new TaskListImpl();
561                        newTasks.add(exec);
562               
563                        schedulingPlugin.placeTasksInQueues(newTasks, queues, getResourceManager(), moduleList);
564
565                        if (exec.getStatus() == DCWormsTags.QUEUED) {
566                                sendExecutableReadyEvent(exec);
567                        }
568                }
569        }
570
571        public WorkloadUnitHandler getWorkloadUnitHandler() {
572                return new LocalWorkloadUnitHandler();
573        }
574       
575       
576        public LocalResourceManager getResourceManager() {
577                if (resourceManager instanceof ResourceManager)
578                        return (LocalResourceManager) resourceManager;
579                else
580                        return null;
581        }
582       
583}
Note: See TracBrowser for help on using the repository browser.